Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 622397 Details for
Bug 863505
Backport CLEANALLRUV to RHEL6.3
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
all 1.2.11 cleanallruv patches compressed into 1 patch file
0023-Ticket-337-RFE-Improve-CLEANRUV-functionality.patch (text/plain), 190.16 KB, created by
Rich Megginson
on 2012-10-05 19:36:35 UTC
(
hide
)
Description:
all 1.2.11 cleanallruv patches compressed into 1 patch file
Filename:
MIME Type:
Creator:
Rich Megginson
Created:
2012-10-05 19:36:35 UTC
Size:
190.16 KB
patch
obsolete
>From af2feb4fd8e95539281c553b0567b52e6e9d6119 Mon Sep 17 00:00:00 2001 >From: root <root@localhost.localdomain> >Date: Mon, 23 Apr 2012 13:36:04 -0400 >Subject: [PATCH] Ticket #337 - RFE - Improve CLEANRUV functionality > >Bug Description: Previously the steps to remove a replica and its RUV was problematic. > I created two new "tasks" to take care of the entire replication environment. > >Fix Description: > >[1] The new task "CLEANALLRUV<rid>" - run it once on any master > > This marks the rid as invalid. Used to reject updates to the changelog, and the database RUV > It then sends a "CLEANRUV" extended operation to each agreement. > Then it cleans its own RUV. > > The CLEANRUV extended op then triggers that replica to send the the same CLEANRUV extop to its replicas, then it cleans its own RID. Basically >this operation cascades through the entire replication environment. > >[2] The "RELEASERUV<rid>" task - run it once on any master > > Once the RUV's have been cleaned on all the replicas, you need to "release" the rid so that it can be reused. This operation also cascades thro >ugh the entire replication environment. This also triggers changelog trimming. > >For all of this to work correctly, there is a list of steps that needs to be followed. This procedure is attached to the ticket. > >https://fedorahosted.org/389/ticket/337 > >reviewed by: ? >(cherry picked from commit 0f50544b9567907edd0ba645951d7cd325354107) >(cherry picked from commit 01b596e37474f196249a40387917bee002c783e8) >--- > Makefile.am | 1 + > Makefile.in | 1 + > ldap/admin/src/scripts/template-cleanallruv.pl.in | 186 ++ > ldap/schema/01core389.ldif | 7 +- > ldap/servers/plugins/replication/cl5_api.c | 134 +- > ldap/servers/plugins/replication/cl5_api.h | 11 +- > ldap/servers/plugins/replication/repl5.h | 61 +- > ldap/servers/plugins/replication/repl5_agmt.c | 122 ++ > ldap/servers/plugins/replication/repl5_agmtlist.c | 4 + > .../servers/plugins/replication/repl5_connection.c | 39 +- > .../plugins/replication/repl5_inc_protocol.c | 21 +- > ldap/servers/plugins/replication/repl5_init.c | 89 +- > ldap/servers/plugins/replication/repl5_plugins.c | 8 +- > ldap/servers/plugins/replication/repl5_replica.c | 498 ++++-- > .../plugins/replication/repl5_replica_config.c | 1972 ++++++++++++++++++-- > ldap/servers/plugins/replication/repl5_ruv.c | 108 +- > ldap/servers/plugins/replication/repl5_ruv.h | 2 + > ldap/servers/plugins/replication/repl_extop.c | 377 ++++- > ldap/servers/plugins/replication/repl_globals.c | 5 +- > ldap/servers/slapd/log.c | 27 + > ldap/servers/slapd/slapi-plugin.h | 3 + > ldap/servers/slapd/task.c | 56 + > 22 files changed, 3398 insertions(+), 334 deletions(-) > create mode 100644 ldap/admin/src/scripts/template-cleanallruv.pl.in > >diff --git a/Makefile.am b/Makefile.am >index bb761e3..e67f9fa 100644 >--- a/Makefile.am >+++ b/Makefile.am >@@ -371,6 +371,7 @@ task_SCRIPTS = ldap/admin/src/scripts/template-bak2db \ > ldap/admin/src/scripts/template-db2ldif.pl \ > ldap/admin/src/scripts/template-fixup-linkedattrs.pl \ > ldap/admin/src/scripts/template-fixup-memberof.pl \ >+ ldap/admin/src/scripts/template-cleanallruv.pl \ > ldap/admin/src/scripts/template-ldif2db.pl \ > ldap/admin/src/scripts/template-ns-accountstatus.pl \ > ldap/admin/src/scripts/template-ns-activate.pl \ >diff --git a/Makefile.in b/Makefile.in >index 88ab46f..efa370b 100644 >--- a/Makefile.in >+++ b/Makefile.in >@@ -1575,6 +1575,7 @@ task_SCRIPTS = ldap/admin/src/scripts/template-bak2db \ > ldap/admin/src/scripts/template-db2ldif.pl \ > ldap/admin/src/scripts/template-fixup-linkedattrs.pl \ > ldap/admin/src/scripts/template-fixup-memberof.pl \ >+ ldap/admin/src/scripts/template-cleanallruv.pl \ > ldap/admin/src/scripts/template-ldif2db.pl \ > ldap/admin/src/scripts/template-ns-accountstatus.pl \ > ldap/admin/src/scripts/template-ns-activate.pl \ >diff --git a/ldap/admin/src/scripts/template-cleanallruv.pl.in b/ldap/admin/src/scripts/template-cleanallruv.pl.in >new file mode 100644 >index 0000000..be95a6d >--- /dev/null >+++ b/ldap/admin/src/scripts/template-cleanallruv.pl.in >@@ -0,0 +1,186 @@ >+#{{PERL-EXEC}} >+# >+# BEGIN COPYRIGHT BLOCK >+# This Program is free software; you can redistribute it and/or modify it under >+# the terms of the GNU General Public License as published by the Free Software >+# Foundation; version 2 of the License. >+# >+# This Program is distributed in the hope that it will be useful, but WITHOUT >+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS >+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. >+# >+# You should have received a copy of the GNU General Public License along with >+# this Program; if not, write to the Free Software Foundation, Inc., 59 Temple >+# Place, Suite 330, Boston, MA 02111-1307 USA. >+# >+# In addition, as a special exception, Red Hat, Inc. gives You the additional >+# right to link the code of this Program with code not covered under the GNU >+# General Public License ("Non-GPL Code") and to distribute linked combinations >+# including the two, subject to the limitations in this paragraph. Non-GPL Code >+# permitted under this exception must only link to the code of this Program >+# through those well defined interfaces identified in the file named EXCEPTION >+# found in the source code files (the "Approved Interfaces"). The files of >+# Non-GPL Code may instantiate templates or use macros or inline functions from >+# the Approved Interfaces without causing the resulting work to be covered by >+# the GNU General Public License. Only Red Hat, Inc. may make changes or >+# additions to the list of Approved Interfaces. You must obey the GNU General >+# Public License in all respects for all of the Program code and other code used >+# in conjunction with the Program except the Non-GPL Code covered by this >+# exception. If you modify this file, you may extend this exception to your >+# version of the file, but you are not obligated to do so. If you do not wish to >+# provide this exception without modification, you must delete this exception >+# statement from your version and license this file solely under the GPL without >+# exception. >+# >+# >+# Copyright (C) 2001 Sun Microsystems, Inc. Used by permission. >+# Copyright (C) 2012 Red Hat, Inc. >+# All rights reserved. >+# END COPYRIGHT BLOCK >+# >+ >+sub usage { >+ print(STDERR "Usage: $0 [-v] -D rootdn { -w password | -w - | -j filename } \n"); >+ print(STDERR " [-b basedn | -r rid | -A]\n"); >+ print(STDERR " Opts: -D rootdn - Directory Manager\n"); >+ print(STDERR " : -w password - Directory Manager's password\n"); >+ print(STDERR " : -w - - Prompt for Directory Manager's password\n"); >+ print(STDERR " : -j filename - Read Directory Manager's password from file\n"); >+ print(STDERR " : -b basedn - DN of the replica root you want to clean\n"); >+ print(STDERR " : -r rid - The replica id that you want to clean\n"); >+ print(STDERR " : -A - Abort an existing cleanallruv task(must use with -b and -r args\n"); >+ print(STDERR " : -v - verbose\n"); >+} >+ >+$rootdn = ""; >+$passwd = ""; >+$passwdfile = ""; >+$basedn = ""; >+$rid = ""; >+$abort = ""; >+$verbose = 0; >+ >+$prefix = "{{DS-ROOT}}"; >+ >+$ENV{'PATH'} = "$prefix@ldaptool_bindir@:$prefix/usr/bin:@ldaptool_bindir@:/usr/bin"; >+ >+libpath_add("$prefix@nss_libdir@"); >+libpath_add("$prefix/usr/lib"); >+libpath_add("@nss_libdir@"); >+libpath_add("/usr/lib"); >+ >+$ENV{'SHLIB_PATH'} = "$ENV{'LD_LIBRARY_PATH'}"; >+ >+$i = 0; >+while ($i <= $#ARGV) >+{ >+ if ("$ARGV[$i]" eq "-b") >+ { >+ # Base DN >+ $i++; $basedn = $ARGV[$i]; >+ } >+ elsif ("$ARGV[$i]" eq "-r") >+ { >+ # rid >+ $i++; $rid = $ARGV[$i]; >+ } >+ elsif ("$ARGV[$i]" eq "-A") >+ { >+ # abort >+ $abort = "yes"; >+ } >+ elsif ("$ARGV[$i]" eq "-D") >+ { >+ # Directory Manager >+ $i++; $rootdn = $ARGV[$i]; >+ } >+ elsif ("$ARGV[$i]" eq "-w") >+ { >+ # Directory Manager's password >+ $i++; $passwd = $ARGV[$i]; >+ } >+ elsif ("$ARGV[$i]" eq "-j") >+ { >+ # Read Directory Manager's password from a file >+ $i++; $passwdfile = $ARGV[$i]; >+ } >+ elsif ("$ARGV[$i]" eq "-v") >+ { >+ # verbose >+ $verbose = 1; >+ } >+ else >+ { >+ &usage; exit(1); >+ } >+ $i++; >+} >+ >+if ($passwdfile ne ""){ >+# Open file and get the password >+ unless (open (RPASS, $passwdfile)) { >+ die "Error, cannot open password file $passwdfile\n"; >+ } >+ $passwd = <RPASS>; >+ chomp($passwd); >+ close(RPASS); >+} elsif ($passwd eq "-"){ >+# Read the password from terminal >+ print "Bind Password: "; >+ # Disable console echo >+ system("@sttyexec@ -echo") if -t STDIN; >+ # read the answer >+ $passwd = <STDIN>; >+ # Enable console echo >+ system("@sttyexec@ echo") if -t STDIN; >+ print "\n"; >+ chop($passwd); # trim trailing newline >+} >+ >+if ( $rootdn eq "" || $passwd eq "" || $basedn eq "" || $rid eq "") >+{ >+ &usage; >+ exit(1); >+} >+ >+$vstr = ""; >+if ($verbose != 0) >+{ >+ $vstr = "-v"; >+} >+ >+# Use a timestamp as part of the task entry name >+($s, $m, $h, $dy, $mn, $yr, $wdy, $ydy, $r) = localtime(time); >+$mn++; $yr += 1900; >+ >+if($abort eq ""){ >+ # Build the task entry to add >+ $taskname = "cleanallruv_${yr}_${mn}_${dy}_${h}_${m}_${s}"; >+ $dn = "dn: cn=$taskname, cn=cleanallruv, cn=tasks, cn=config\n"; >+} else { >+ $taskname = "abort_cleanallruv_${yr}_${mn}_${dy}_${h}_${m}_${s}"; >+ $dn = "dn: cn=$taskname, cn=abort cleanallruv, cn=tasks, cn=config\n"; >+} >+$misc = "changetype: add\nobjectclass: top\nobjectclass: extensibleObject\n"; >+$cn = "cn: $taskname\n"; >+$basedn = "replica-base-dn: $basedn\n"; >+$rid = "replica-id: $rid\n"; >+ >+ >+$entry = "${dn}${misc}${cn}${basedn}${rid}"; >+open(FOO, "| ldapmodify @ldaptool_opts@ $vstr -h {{SERVER-NAME}} -p {{SERVER-PORT}} -D \"$rootdn\" -w \"$passwd\" -a" ); >+print(FOO "$entry"); >+close(FOO); >+ >+sub libpath_add { >+ my $libpath = shift; >+ >+ if ($libpath) { >+ if ($ENV{'LD_LIBRARY_PATH'}) { >+ $ENV{'LD_LIBRARY_PATH'} = "$ENV{'LD_LIBRARY_PATH'}:$libpath"; >+ } else { >+ $ENV{'LD_LIBRARY_PATH'} = "$libpath"; >+ } >+ } >+} >+ >diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif >index cf30ab0..2a99c1a 100644 >--- a/ldap/schema/01core389.ldif >+++ b/ldap/schema/01core389.ldif >@@ -137,6 +137,9 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2111 NAME 'tombstoneNumSubordinates' > NO-USER-MODIFICATION > USAGE directoryOperation > X-ORIGIN '389 directory server' ) >+attributeTypes: ( 2.16.840.1.113730.3.1.2135 NAME 'nsds5ReplicaCleanRUV' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 X-ORIGIN 'Netscape Directory Server' ) >+attributeTypes: ( 2.16.840.1.113730.3.1.2136 NAME 'nsds5ReplicaCleanRUVNotified' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 X-ORIGIN 'Netscape Directory Server' ) >+attributeTypes: ( 2.16.840.1.113730.3.1.2137 NAME 'nsds5ReplicaAbortCleanRUV' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 X-ORIGIN 'Netscape Directory Server' ) > # > # objectclasses > # >@@ -146,9 +149,9 @@ objectClasses: ( 2.16.840.1.113730.3.2.44 NAME 'nsIndex' DESC 'Netscape defined > objectClasses: ( 2.16.840.1.113730.3.2.109 NAME 'nsBackendInstance' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) > objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) > objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) >-objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer) X-ORIGIN 'Netscape Directory Server' ) >+objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer) X-ORIGIN 'Netscape Directory Server' ) > objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' ) >-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5replicaSessionPauseTime ) X-ORIGIN 'Netscape Directory Server' ) >+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5replicaSessionPauseTime ) X-ORIGIN 'Netscape Directory Server' ) > objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' ) > objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) X-ORIGIN 'Netscape Directory Server' ) > objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' ) >diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c >index 0ea90d1..0184404 100644 >--- a/ldap/servers/plugins/replication/cl5_api.c >+++ b/ldap/servers/plugins/replication/cl5_api.c >@@ -339,14 +339,15 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB > static int _cl5TrimInit (); > static void _cl5TrimCleanup (); > static int _cl5TrimMain (void *param); >-static void _cl5DoTrimming (); >-static void _cl5TrimFile (Object *obj, long *numToTrim); >+static void _cl5DoTrimming (ReplicaId rid); >+static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid); > static PRBool _cl5CanTrim (time_t time, long *numToTrim); > static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge); > static int _cl5WriteRUV (CL5DBFile *file, PRBool purge); > static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge); > static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge); > static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv); >+void trigger_cl_trimming_thread(void *rid); > > /* bakup/recovery, import/export */ > static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op, >@@ -692,12 +693,12 @@ int cl5DeleteDBSync (Object *replica) > } > > /* Name: cl5GetUpperBoundRUV >- Description: retrieves vector for that represnts the upper bound of the changes for a replica. >+ Description: retrieves vector for that represents the upper bound of the changes for a replica. > Parameters: r - replica for which the purge vector is requested > ruv - contains a copy of the purge ruv if function is successful; >- unchanged otherwise. It is responsobility pf the caller to free >+ unchanged otherwise. It is responsibility of the caller to free > the ruv when it is no longer is in use >- Return: CL5_SUCCESS if function is successfull >+ Return: CL5_SUCCESS if function is successful > CL5_BAD_STATE if the changelog is not initialized; > CL5_BAD_DATA - if NULL id is supplied > CL5_NOTFOUND, if changelog file for replica is not found >@@ -1675,6 +1676,16 @@ cl5GetNextOperationToReplay (CL5ReplayIterator *iterator, CL5Entry *entry) > return CL5_DB_ERROR; > } > >+ if(is_cleaned_rid(csn_get_replicaid(csn))){ >+ /* >+ * This operation is from a deleted replica. During the cleanallruv task the >+ * replicas are cleaned first before this instance is. This can cause the >+ * server to basically do a full update over and over. So we have to watch for >+ * this, and not send these operations out. >+ */ >+ return CL5_IGNORE_OP; >+ } >+ > /* there is an entry we should return */ > /* Callers of this function should cl5_operation_parameters_done(op) */ > if ( 0 != cl5DBData2Entry ( data, datalen, entry ) ) >@@ -3376,7 +3387,7 @@ static int _cl5TrimMain (void *param) > { > /* time to trim */ > timePrev = timeNow; >- _cl5DoTrimming (); >+ _cl5DoTrimming (0 /* there's no cleaned rid */); > } > if (NULL == s_cl5Desc.clLock) > { >@@ -3410,7 +3421,7 @@ static int _cl5TrimMain (void *param) > > */ > >-static void _cl5DoTrimming () >+static void _cl5DoTrimming (ReplicaId rid) > { > Object *obj; > long numToTrim; >@@ -3423,7 +3434,7 @@ static void _cl5DoTrimming () > obj = objset_first_obj (s_cl5Desc.dbFiles); > while (obj && _cl5CanTrim ((time_t)0, &numToTrim)) > { >- _cl5TrimFile (obj, &numToTrim); >+ _cl5TrimFile (obj, &numToTrim, rid); > obj = objset_next_obj (s_cl5Desc.dbFiles, obj); > } > >@@ -3440,12 +3451,13 @@ static void _cl5DoTrimming () > */ > #define CL5_TRIM_MAX_PER_TRANSACTION 10 > >-static void _cl5TrimFile (Object *obj, long *numToTrim) >+static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) > { > DB_TXN *txnid; > RUV *ruv = NULL; > CL5Entry entry; > slapi_operation_parameters op = {0}; >+ ReplicaId csn_rid; > void *it; > int finished = 0, totalTrimmed = 0, count; > PRBool abort; >@@ -3489,13 +3501,13 @@ static void _cl5TrimFile (Object *obj, long *numToTrim) > * This change can be trimmed if it exceeds purge > * parameters and has been seen by all consumers. > */ >+ csn_rid = csn_get_replicaid (op.csn); > if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) && > ruv_covers_csn_strict (ruv, op.csn) ) >- { >+ { > rc = _cl5CurrentDeleteEntry (it); >- if ( rc == CL5_SUCCESS ) >+ if ( rc == CL5_SUCCESS && cleaned_rid != csn_rid) > { >- /* update purge vector */ > rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE); > } > if ( rc == CL5_SUCCESS) >@@ -3521,10 +3533,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim) > * the trim forever. > */ > CSN *maxcsn = NULL; >- ReplicaId rid; >- >- rid = csn_get_replicaid (op.csn); >- ruv_get_largest_csn_for_replica (ruv, rid, &maxcsn); >+ ruv_get_largest_csn_for_replica (ruv, csn_rid, &maxcsn); > if ( csn_compare (op.csn, maxcsn) != 0 ) > { > /* op.csn is not anchor CSN */ >@@ -3612,10 +3621,10 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim) > *numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries; > return ( *numToTrim > 0 ); > } >- >+ > if (s_cl5Desc.dbTrim.maxEntries > 0 && > (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) >- return PR_TRUE; >+ return PR_TRUE; > > if (time) > return (current_time () - time > s_cl5Desc.dbTrim.maxAge); >@@ -3797,6 +3806,7 @@ static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge) > void *iterator = NULL; > slapi_operation_parameters op = {0}; > CL5DBFile *file; >+ ReplicaId rid; > > PR_ASSERT (replGen && obj); > >@@ -3820,6 +3830,15 @@ static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge) > rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); > while (rc == CL5_SUCCESS) > { >+ rid = csn_get_replicaid (op.csn); >+ if(is_cleaned_rid(rid)){ >+ /* skip this entry as the rid is invalid */ >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV: " >+ "skipping entry because its csn contains a cleaned rid(%d)\n", rid); >+ cl5_operation_parameters_done (&op); >+ rc = _cl5GetNextEntry (&entry, iterator); >+ continue; >+ } > if (purge) > rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn); > else >@@ -3868,28 +3887,30 @@ static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge > > file = (CL5DBFile*)object_get_data (obj); > >- /* if purge is TRUE, file->purgeRUV must be set; >- if purge is FALSE, maxRUV must be set */ >+ /* >+ * if purge is TRUE, file->purgeRUV must be set; >+ * if purge is FALSE, maxRUV must be set >+ */ > PR_ASSERT (file && ((purge && file->purgeRUV) || (!purge && file->maxRUV))); >+ rid = csn_get_replicaid(csn); > > /* update vector only if this replica is not yet part of RUV */ > if (purge && newReplica) > { >- rid = csn_get_replicaid(csn); >- if (ruv_contains_replica (file->purgeRUV, rid)) >+ if (ruv_contains_replica (file->purgeRUV, rid)){ > return CL5_SUCCESS; >- else >- { >- /* if the replica is not part of the purgeRUV yet, add it */ >- ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl()); >+ } else { >+ /* if the replica is not part of the purgeRUV yet, add it unless it's from a cleaned rid */ >+ ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl()); > } > } > else > { >- if (purge) >+ if (purge){ > rc = ruv_set_csns(file->purgeRUV, csn, NULL); >- else >- rc = ruv_set_csns(file->maxRUV, csn, NULL); >+ } else { >+ rc = ruv_set_csns(file->maxRUV, csn, NULL); >+ } > } > > if (rc != RUV_SUCCESS) >@@ -4762,7 +4783,7 @@ static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_ > rc, db_strerror(rc)); > rc = CL5_DB_ERROR; > >-done:; >+done: > /* error occured */ > /* We didn't success in assigning this cursor to the iterator, > * so we need to free the cursor here */ >@@ -5055,7 +5076,7 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum > * legacy consumer. In this case the supplier > * and the consumer may have the same RID. > */ >- if (rid == consumerRID && rid != MAX_REPLICA_ID) >+ if ((rid == consumerRID && rid != MAX_REPLICA_ID) || (is_cleaned_rid(rid)) ) > continue; > > startCSN = csns[i]; >@@ -6504,3 +6525,54 @@ bail: > changelog5_config_done(&config); > return rc; > } >+ >+/* >+ * Clean the in memory RUV, at shutdown we will write the update to the db >+ */ >+void >+cl5CleanRUV(ReplicaId rid){ >+ CL5DBFile *file; >+ Object *obj; >+ >+ obj = objset_first_obj(s_cl5Desc.dbFiles); >+ while (obj){ >+ file = (CL5DBFile *)object_get_data(obj); >+ ruv_delete_replica(file->purgeRUV, rid); >+ ruv_delete_replica(file->maxRUV, rid); >+ obj = objset_next_obj(s_cl5Desc.dbFiles, obj); >+ } >+} >+ >+void trigger_cl_trimming(ReplicaId rid){ >+ PRThread *trim_tid = NULL; >+ >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_trimming: rid (%d)\n",(int)rid); >+ trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_trimming_thread, >+ (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE); >+ if (NULL == trim_tid){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, >+ "trigger_cl_trimming: failed to create trimming " >+ "thread; NSPR error - %d\n", PR_GetError ()); >+ } else { >+ /* need a little time for the thread to get started */ >+ DS_Sleep(PR_SecondsToInterval(1)); >+ } >+} >+ >+void >+trigger_cl_trimming_thread(void *arg){ >+ ReplicaId rid = *(ReplicaId *)arg; >+ >+ /* make sure we have a change log, and we aren't closing it */ >+ if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){ >+ return; >+ } >+ if (CL5_SUCCESS != _cl5AddThread()) { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, >+ "trigger_cl_trimming: failed to increment thread count " >+ "NSPR error - %d\n", PR_GetError ()); >+ } >+ _cl5DoTrimming(rid); >+ _cl5RemoveThread(); >+} >diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h >index 6f2552f..9b285ca 100644 >--- a/ldap/servers/plugins/replication/cl5_api.h >+++ b/ldap/servers/plugins/replication/cl5_api.h >@@ -145,9 +145,10 @@ enum > CL5_CSN_ERROR, /* CSN API failed */ > CL5_RUV_ERROR, /* RUV API failed */ > CL5_OBJSET_ERROR, /* namedobjset api failed */ >- CL5_PURGED_DATA, /* requested data has been purged */ >- CL5_MISSING_DATA, /* data should be in the changelog, but is missing */ >- CL5_UNKNOWN_ERROR /* unclassified error */ >+ CL5_PURGED_DATA, /* requested data has been purged */ >+ CL5_MISSING_DATA, /* data should be in the changelog, but is missing */ >+ CL5_UNKNOWN_ERROR, /* unclassified error */ >+ CL5_IGNORE_OP /* ignore this updated - used by CLEANALLRUV task */ > }; > > /***** Module APIs *****/ >@@ -487,4 +488,8 @@ int cl5WriteRUV(); > Return: TRUE > */ > int cl5DeleteRUV(); >+void cl5CleanRUV(ReplicaId rid); >+void cl5NotifyCleanup(int rid); >+void trigger_cl_trimming(ReplicaId rid); >+ > #endif >diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h >index ac2cd88..1637057 100644 >--- a/ldap/servers/plugins/replication/repl5.h >+++ b/ldap/servers/plugins/replication/repl5.h >@@ -94,7 +94,11 @@ > * new set of start and response extops. */ > #define REPL_START_NSDS90_REPLICATION_REQUEST_OID "2.16.840.1.113730.3.5.12" > #define REPL_NSDS90_REPLICATION_RESPONSE_OID "2.16.840.1.113730.3.5.13" >- >+/* cleanallruv extended ops */ >+#define REPL_CLEANRUV_OID "2.16.840.1.113730.3.6.5" >+#define REPL_ABORT_CLEANRUV_OID "2.16.840.1.113730.3.6.6" >+#define CLEANRUV_NOTIFIED 0 >+#define CLEANRUV_RELEASED 1 > > /* DS 5.0 replication protocol error codes */ > #define NSDS50_REPL_REPLICA_READY 0x00 /* Replica ready, go ahead */ >@@ -152,6 +156,7 @@ extern const char *type_nsds5ReplicaInitialize; > extern const char *type_nsds5ReplicaTimeout; > extern const char *type_nsds5ReplicaBusyWaitTime; > extern const char *type_nsds5ReplicaSessionPauseTime; >+extern const char *type_nsds5ReplicaCleanRUVnotified; > > /* Attribute names for windows replication agreements */ > extern const char *type_nsds7WindowsReplicaArea; >@@ -180,6 +185,8 @@ extern const char *type_replicaPurgeDelay; > extern const char *type_replicaChangeCount; > extern const char *type_replicaTombstonePurgeInterval; > extern const char *type_replicaLegacyConsumer; >+extern const char *type_replicaCleanRUV; >+extern const char *type_replicaAbortCleanRUV; > extern const char *type_ruvElementUpdatetime; > > /* multimaster plugin points */ >@@ -218,6 +225,8 @@ char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn); > /* In repl_extop.c */ > int multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb); > int multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb); >+int multimaster_extop_cleanruv(Slapi_PBlock *pb); >+int multimaster_extop_abort_cleanruv(Slapi_PBlock *pb); > int extop_noop(Slapi_PBlock *pb); > struct berval *NSDS50StartReplicationRequest_new(const char *protocol_oid, > const char *repl_root, char **extra_referrals, CSN *csn); >@@ -345,9 +354,12 @@ char **agmt_get_fractional_attrs_total(const Repl_Agmt *ra); > char **agmt_validate_replicated_attributes(Repl_Agmt *ra, int total); > void* agmt_get_priv (const Repl_Agmt *agmt); > void agmt_set_priv (Repl_Agmt *agmt, void* priv); >- > int get_agmt_agreement_type ( Repl_Agmt *agmt); >+void* agmt_get_connection( Repl_Agmt *ra); > int agmt_has_protocol(Repl_Agmt *agmt); >+void agmt_set_cleanruv_notified_from_entry(Repl_Agmt *ra, Slapi_Entry *e); >+int agmt_set_cleanruv_data(Repl_Agmt *ra, ReplicaId rid, int op); >+int agmt_is_cleanruv_notified(Repl_Agmt *ra, ReplicaId rid); > > typedef struct replica Replica; > >@@ -358,7 +370,6 @@ void agmtlist_notify_all(Slapi_PBlock *pb); > Object* agmtlist_get_first_agreement_for_replica (Replica *r); > Object* agmtlist_get_next_agreement_for_replica (Replica *r, Object *prev); > >- > /* In repl5_backoff.c */ > typedef struct backoff_timer Backoff_Timer; > #define BACKOFF_FIXED 1 >@@ -429,6 +440,10 @@ long conn_get_timeout(Repl_Connection *conn); > void conn_set_agmt_changed(Repl_Connection *conn); > ConnResult conn_read_result(Repl_Connection *conn, int *message_id); > ConnResult conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retdatap, LDAPControl ***returned_controls, int send_msgid, int *resp_msgid, int noblock); >+LDAP * conn_get_ldap(Repl_Connection *conn); >+void conn_lock(Repl_Connection *conn); >+void conn_unlock(Repl_Connection *conn); >+void conn_delete_internal_ext(Repl_Connection *conn); > > /* In repl5_protocol.c */ > typedef struct repl_protocol Repl_Protocol; >@@ -540,6 +555,9 @@ void replica_set_tombstone_reap_interval (Replica *r, long interval); > void replica_update_ruv_consumer (Replica *r, RUV *supplier_ruv); > void replica_set_ruv_dirty (Replica *r); > void replica_write_ruv (Replica *r); >+char *replica_get_dn(Replica *r); >+void replica_check_for_tasks(Replica*r, Slapi_Entry *e); >+ > /* The functions below handles the state flag */ > /* Current internal state flags */ > /* The replica can be busy and not other flag, >@@ -579,6 +597,43 @@ void multimaster_be_state_change (void *handle, char *be_name, int old_be_state, > int replica_config_init(); > void replica_config_destroy (); > int get_replica_type(Replica *r); >+int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid); >+void add_cleaned_rid(ReplicaId rid, Replica *r, char *maxcsn); >+int is_cleaned_rid(ReplicaId rid); >+int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, >+ int *returncode, char *returntext, void *arg); >+void replica_cleanallruv_thread_ext(void *arg); >+void stop_ruv_cleaning(); >+int task_aborted(); >+void replica_abort_task_thread(void *arg); >+void delete_cleaned_rid(Replica *r, ReplicaId rid, CSN *maxcsn); >+int process_repl_agmts(Replica *replica, int *agmt_info, char *oid, Slapi_Task *task, struct berval *payload, int op); >+int decode_cleanruv_payload(struct berval *extop_value, char **payload); >+struct berval *create_ruv_payload(char *value); >+void replica_add_cleanruv_data(Replica *r, char *val); >+void replica_remove_cleanruv_data(Replica *r, char *val); >+CSN *replica_get_cleanruv_maxcsn(Replica *r, ReplicaId rid); >+void ruv_get_cleaned_rids(RUV *ruv, ReplicaId *rids); >+void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root); >+int is_task_aborted(ReplicaId rid); >+void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root); >+void set_cleaned_rid(ReplicaId rid); >+void cleanruv_log(Slapi_Task *task, char *task_type, char *fmt, ...); >+ >+#define CLEANRIDSIZ 4 /* maximum number for concurrent CLEANALLRUV tasks */ >+ >+typedef struct _cleanruv_data >+{ >+ Object *repl_obj; >+ Replica *replica; >+ ReplicaId rid; >+ Slapi_Task *task; >+ struct berval *payload; >+ CSN *maxcsn; >+ char *repl_root; >+ Slapi_DN *sdn; >+ char *certify; >+} cleanruv_data; > > /* replutil.c */ > LDAPControl* create_managedsait_control (); >diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c >index 1d9affa..946e7f7 100644 >--- a/ldap/servers/plugins/replication/repl5_agmt.c >+++ b/ldap/servers/plugins/replication/repl5_agmt.c >@@ -138,6 +138,7 @@ typedef struct repl5agmt { > for sync agreements or for replication session plug-in > private data for normal replication agreements */ > int agreement_type; >+ int cleanruv_notified[CLEANRIDSIZ + 1]; /* specifies if the replica has been notified of a CLEANALLRUV task */ > } repl5agmt; > > /* Forward declarations */ >@@ -228,6 +229,7 @@ agmt_new_from_entry(Slapi_Entry *e) > Repl_Agmt *ra; > char *tmpstr; > Slapi_Attr *sattr; >+ char **clean_vals = NULL; > char **denied_attrs = NULL; > > char *auto_initialize = NULL; >@@ -387,6 +389,20 @@ agmt_new_from_entry(Slapi_Entry *e) > ra->last_init_start_time = 0UL; > ra->last_init_status[0] = '\0'; > >+ /* cleanruv notification */ >+ clean_vals = slapi_entry_attr_get_charray(e, type_nsds5ReplicaCleanRUVnotified); >+ if(clean_vals){ >+ int i; >+ for (i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++){ >+ ra->cleanruv_notified[i] = atoi(clean_vals[i]); >+ } >+ if(i < CLEANRIDSIZ) >+ ra->cleanruv_notified[i + 1] = 0; >+ slapi_ch_array_free(clean_vals); >+ } else { >+ ra->cleanruv_notified[0] = 0; >+ } >+ > /* Fractional attributes */ > slapi_entry_attr_find(e, type_nsds5ReplicatedAttributeList, &sattr); > >@@ -1633,6 +1649,20 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) > return return_value; > } > >+int >+agmt_set_timeout(Repl_Agmt *ra, long timeout) >+{ >+ PR_Lock(ra->lock); >+ if (ra->stop_in_progress){ >+ PR_Unlock(ra->lock); >+ return -1; >+ } >+ ra->timeout = timeout; >+ PR_Unlock(ra->lock); >+ >+ return 0; >+} >+ > /* > * Set or reset the busywaittime > * >@@ -2425,6 +2455,15 @@ ReplicaId agmt_get_consumerRID(Repl_Agmt *ra) > return ra->consumerRID; > } > >+void* agmt_get_connection(Repl_Agmt *ra) >+{ >+ if(ra->protocol){ >+ return (void *)prot_get_connection(ra->protocol); >+ } else { >+ return NULL; >+ } >+} >+ > int > agmt_has_protocol(Repl_Agmt *agmt) > { >@@ -2433,3 +2472,86 @@ agmt_has_protocol(Repl_Agmt *agmt) > } > return 0; > } >+ >+int >+agmt_is_cleanruv_notified(Repl_Agmt *ra, ReplicaId rid){ >+ int notified = 0; >+ int i; >+ >+ PR_Lock(ra->lock); >+ for(i = 0; i < CLEANRIDSIZ && ra->cleanruv_notified[i]; i++){ >+ if(ra->cleanruv_notified[i] == rid){ >+ notified = 1; >+ break; >+ } >+ } >+ PR_Unlock(ra->lock); >+ >+ return notified; >+} >+ >+/* >+ * This will trigger agmt_set_cleanruv_notified_from_entry() to be called, >+ * which will update the in memory agmt. >+ * >+ * op can be: CLEANRUV_NOTIFIED or CLEANRUV_RELEASED >+ */ >+int >+agmt_set_cleanruv_data(Repl_Agmt *ra, ReplicaId rid, int op){ >+ Slapi_PBlock *pb; >+ LDAPMod *mods[2]; >+ LDAPMod mod; >+ struct berval *vals[2]; >+ struct berval val; >+ char data[6]; >+ int rc = 0; >+ >+ if(ra == NULL){ >+ return -1; >+ } >+ >+ if(op == CLEANRUV_NOTIFIED){ >+ /* add the cleanruv data */ >+ mod.mod_op = LDAP_MOD_ADD|LDAP_MOD_BVALUES; >+ } else { >+ /* remove the cleanruv data */ >+ mod.mod_op = LDAP_MOD_DELETE|LDAP_MOD_BVALUES; >+ } >+ >+ pb = slapi_pblock_new(); >+ val.bv_len = PR_snprintf(data, sizeof(data), "%d", (int)rid); >+ mod.mod_type = (char *)type_nsds5ReplicaCleanRUVnotified; >+ mod.mod_bvalues = vals; >+ vals [0] = &val; >+ vals [1] = NULL; >+ val.bv_val = data; >+ mods[0] = &mod; >+ mods[1] = NULL; >+ >+ slapi_modify_internal_set_pb_ext (pb, ra->dn, mods, NULL, NULL, >+ repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); >+ slapi_modify_internal_pb (pb); >+ slapi_pblock_destroy(pb); >+ >+ return rc; >+} >+ >+void >+agmt_set_cleanruv_notified_from_entry(Repl_Agmt *ra, Slapi_Entry *e){ >+ char **attr_vals = NULL; >+ int i; >+ >+ PR_Lock(ra->lock); >+ attr_vals = slapi_entry_attr_get_charray(e, type_nsds5ReplicaCleanRUVnotified); >+ if(attr_vals){ >+ for (i = 0; i < CLEANRIDSIZ && attr_vals[i]; i++){ >+ ra->cleanruv_notified[i] = atoi(attr_vals[i]); >+ } >+ if( i < CLEANRIDSIZ ) >+ ra->cleanruv_notified[i + 1] = 0; >+ slapi_ch_array_free(attr_vals); >+ } else { >+ ra->cleanruv_notified[0] = 0; >+ } >+ PR_Unlock(ra->lock); >+} >diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c >index 8a98c21..6b5dab4 100644 >--- a/ldap/servers/plugins/replication/repl5_agmtlist.c >+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c >@@ -488,6 +488,10 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry > /* ignore modifier's name and timestamp attributes and the description. */ > continue; > } >+ else if (slapi_attr_types_equivalent(mods[i]->mod_type, type_nsds5ReplicaCleanRUVnotified)) >+ { >+ agmt_set_cleanruv_notified_from_entry(agmt, e); >+ } > else if (0 == windows_handle_modify_agreement(agmt, mods[i]->mod_type, e)) > { > slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " >diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c >index a074a9f..51a2bc5 100644 >--- a/ldap/servers/plugins/replication/repl5_connection.c >+++ b/ldap/servers/plugins/replication/repl5_connection.c >@@ -140,7 +140,7 @@ static void repl5_debug_timeout_callback(time_t when, void *arg); > static void close_connection_internal(Repl_Connection *conn); > > /* >- * Create a new conenction object. Returns a pointer to the object, or >+ * Create a new connection object. Returns a pointer to the object, or > * NULL if an error occurs. > */ > Repl_Connection * >@@ -217,6 +217,17 @@ conn_delete_internal(Repl_Connection *conn) > } > > /* >+ * Used by CLEANALLRUV - free it all! >+ */ >+void >+conn_delete_internal_ext(Repl_Connection *conn) >+{ >+ conn_delete_internal(conn); >+ PR_DestroyLock(conn->lock); >+ slapi_ch_free((void **)&conn); >+} >+ >+/* > * Destroy a connection. It is an error to use the connection object > * after conn_delete() has been called. > */ >@@ -1710,6 +1721,16 @@ conn_get_timeout(Repl_Connection *conn) > return retval; > } > >+LDAP * >+conn_get_ldap(Repl_Connection *conn) >+{ >+ if(conn){ >+ return conn->ld; >+ } else { >+ return NULL; >+ } >+} >+ > void conn_set_agmt_changed(Repl_Connection *conn) > { > PR_ASSERT(NULL != conn); >@@ -1908,3 +1929,19 @@ repl5_debug_timeout_callback(time_t when, void *arg) > "repl5_debug_timeout_callback: set debug level to %d at %ld\n", > s_debug_level, when); > } >+ >+void >+conn_lock(Repl_Connection *conn) >+{ >+ if(conn){ >+ PR_Lock(conn->lock); >+ } >+} >+ >+void >+conn_unlock(Repl_Connection *conn) >+{ >+ if(conn){ >+ PR_Unlock(conn->lock); >+ } >+} >diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c >index c9ad6fc..eb361e6 100644 >--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c >+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c >@@ -463,7 +463,7 @@ repl5_inc_waitfor_async_results(result_data *rd) > int done = 0; > int loops = 0; > /* Keep pulling results off the LDAP connection until we catch up to the last message id stored in the rd */ >- while (!done) >+ while (!done && !slapi_is_shutting_down()) > { > /* Lock the structure to force memory barrier */ > PR_Lock(rd->lock); >@@ -1571,10 +1571,10 @@ repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_cr > agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/); > } > slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name, >- "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s. %s.\n", >+ "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n", > agmt_get_long_name(prp->agmt), > uniqueid, csn_str, >- ldap_err2string(connection_error), >+ ldap_err2string(connection_error), connection_error, > *finished ? "Will retry later" : "Skipping"); > } > else if (CONN_NOT_CONNECTED == replay_crc) >@@ -1585,10 +1585,11 @@ repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_cr > *finished = 1; > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "%s: Consumer failed to replay change (uniqueid %s, CSN %s): " >- "%s. Will retry later.\n", >+ "%s(%d). Will retry later.\n", > agmt_get_long_name(prp->agmt), > uniqueid, csn_str, >- connection_error ? ldap_err2string(connection_error) : "Connection lost"); >+ connection_error ? ldap_err2string(connection_error) : "Connection lost", >+ connection_error); > } > else if (CONN_TIMEOUT == replay_crc) > { >@@ -1631,7 +1632,7 @@ repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_cr > * has already been acquired, (2) that the consumer's update vector has > * been checked and (3) that it's ok to send incremental updates. > * Returns: >- * UPDATE_NO_MORE_UPDATES - all updates were sent succussfully >+ * UPDATE_NO_MORE_UPDATES - all updates were sent successfully > * UPDATE_TRANSIENT_ERROR - some non-permanent error occurred. Try again later. > * UPDATE_FATAL_ERROR - some bad, permanent error occurred. > * UPDATE_SCHEDULE_WINDOW_CLOSED - the schedule window closed on us. >@@ -1701,7 +1702,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu > agmt_get_long_name(prp->agmt)); > return_value = UPDATE_FATAL_ERROR; > break; >- case CL5_SYSTEM_ERROR: /* NSPR error occurred: use PR_GetError for furhter info */ >+ case CL5_SYSTEM_ERROR: /* NSPR error occurred: use PR_GetError for further info */ > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "%s: An NSPR error (%d) occurred\n", > agmt_get_long_name(prp->agmt), PR_GetError()); >@@ -1740,7 +1741,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu > break; > case CL5_UNKNOWN_ERROR: /* unclassified error */ > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "%s: An unknown error was ecountered\n", >+ "%s: An unknown error was encountered\n", > agmt_get_long_name(prp->agmt)); > return_value = UPDATE_TRANSIENT_ERROR; > break; >@@ -1926,6 +1927,8 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu > agmt_get_long_name(prp->agmt)); > return_value = UPDATE_FATAL_ERROR; > break; >+ case CL5_IGNORE_OP: >+ break; > default: > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "%s: Unknown error code (%d) returned from cl5GetNextOperationToReplay\n", >@@ -1957,7 +1960,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu > /* Terminate the results reading thread */ > if (!prp->repl50consumer) > { >- /* We need to ensure that we wait until all the responses have been recived from our operations */ >+ /* We need to ensure that we wait until all the responses have been received from our operations */ > if (return_value != UPDATE_CONNECTION_LOST) { > /* if connection was lost/closed, there will be nothing to read */ > repl5_inc_waitfor_async_results(rd); >diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c >index 4e2464c..7a80c6f 100644 >--- a/ldap/servers/plugins/replication/repl5_init.c >+++ b/ldap/servers/plugins/replication/repl5_init.c >@@ -118,6 +118,22 @@ static char *response_name_list[] = { > NSDS_REPL_NAME_PREFIX " Response", > NULL > }; >+static char *cleanruv_oid_list[] = { >+ REPL_CLEANRUV_OID, >+ NULL >+}; >+static char *cleanruv_name_list[] = { >+ NSDS_REPL_NAME_PREFIX " Cleanruv", >+ NULL >+}; >+static char *cleanruv_abort_oid_list[] = { >+ REPL_ABORT_CLEANRUV_OID, >+ NULL >+}; >+static char *cleanruv_abort_name_list[] = { >+ NSDS_REPL_NAME_PREFIX " Cleanruv Abort", >+ NULL >+}; > > /* List of plugin identities for every plugin registered. Plugin identity > is passed by the server in the plugin init function and must be supplied >@@ -434,6 +450,51 @@ multimaster_response_extop_init( Slapi_PBlock *pb ) > return rc; > } > >+int >+multimaster_cleanruv_extop_init( Slapi_PBlock *pb ) >+{ >+ int rc= 0; /* OK */ >+ void *identity = NULL; >+ >+ /* get plugin identity and store it to pass to internal operations */ >+ slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity); >+ PR_ASSERT (identity); >+ >+ if (slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, SLAPI_PLUGIN_VERSION_01 ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, (void *)&multimasterextopdesc ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_OIDLIST, (void *)cleanruv_oid_list ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_NAMELIST, (void *)cleanruv_name_list ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_FN, (void *)multimaster_extop_cleanruv )) >+ { >+ slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name, "multimaster_cleanruv_extop_init failed\n" ); >+ rc= -1; >+ } >+ >+ return rc; >+} >+ >+int >+multimaster_cleanruv_abort_extop_init( Slapi_PBlock *pb ) >+{ >+ int rc= 0; /* OK */ >+ void *identity = NULL; >+ >+ /* get plugin identity and store it to pass to internal operations */ >+ slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity); >+ PR_ASSERT (identity); >+ >+ if (slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, SLAPI_PLUGIN_VERSION_01 ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, (void *)&multimasterextopdesc ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_OIDLIST, (void *)cleanruv_abort_oid_list ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_NAMELIST, (void *)cleanruv_abort_name_list ) != 0 || >+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_FN, (void *)multimaster_extop_abort_cleanruv )) >+ { >+ slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name, "multimaster_cleanruv_abort_extop_init failed\n" ); >+ rc= -1; >+ } >+ >+ return rc; >+} > > static PRBool > check_for_ldif_dump(Slapi_PBlock *pb) >@@ -544,24 +605,22 @@ multimaster_stop( Slapi_PBlock *pb ) > int rc= 0; /* OK */ > > if (!multimaster_stopped_flag) >- { >- if (!is_ldif_dump) >- { >- agmtlist_shutdown(); /* Shut down replication agreements */ >- } >- >+ { >+ if (!is_ldif_dump) >+ { >+ /* Shut down replication agreements */ >+ agmtlist_shutdown(); >+ } >+ /* if we are cleaning a ruv, stop */ >+ stop_ruv_cleaning(); > /* unregister backend state change notification */ > slapi_unregister_backend_state_change((void *)multimaster_be_state_change); >- >- changelog5_cleanup(); /* Shut down the changelog */ >- multimaster_mtnode_extension_destroy(); /* Destroy mapping tree node exts */ >+ changelog5_cleanup(); /* Shut down the changelog */ >+ multimaster_mtnode_extension_destroy(); /* Destroy mapping tree node exts */ > replica_destroy_name_hash(); /* destroy the hash and its remaining content */ > replica_config_destroy (); /* Destroy replica config info */ >- multimaster_stopped_flag = 1; >- /* JCMREPL - Wait for all our threads to stop */ >- /* JCMREPL - Shut down the replication plugin */ >- /* JCMREPL - Mark all the replication plugin interfaces at not enabled. */ >- } >+ multimaster_stopped_flag = 1; >+ } > return rc; > } > >@@ -618,6 +677,8 @@ int replication_multimaster_plugin_init(Slapi_PBlock *pb) > rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_end_extop_init", multimaster_end_extop_init, "Multimaster replication end extended operation plugin", NULL, identity); > rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_total_extop_init", multimaster_total_extop_init, "Multimaster replication total update extended operation plugin", NULL, identity); > rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_response_extop_init", multimaster_response_extop_init, "Multimaster replication extended response plugin", NULL, identity); >+ rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_cleanruv_extop_init", multimaster_cleanruv_extop_init, "Multimaster replication cleanruv extended operation plugin", NULL, identity); >+ rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_cleanruv_abort_extop_init", multimaster_cleanruv_abort_extop_init, "Multimaster replication cleanruv abort extended operation plugin", NULL, identity); > if (0 == rc) > { > multimaster_initialised = 1; >diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c >index c806c08..9da7329 100644 >--- a/ldap/servers/plugins/replication/repl5_plugins.c >+++ b/ldap/servers/plugins/replication/repl5_plugins.c >@@ -1059,6 +1059,12 @@ write_changelog_and_ruv (Slapi_PBlock *pb) > op_params->target_address.uniqueid = slapi_ch_strdup (uniqueid); > } > >+ if( is_cleaned_rid(csn_get_replicaid(op_params->csn))){ >+ /* this RID has been cleaned */ >+ object_release (repl_obj); >+ return 0; >+ } >+ > /* we might have stripped all the mods - in that case we do not > log the operation */ > if (op_params->operation_type != SLAPI_OPERATION_MODIFY || >@@ -1313,7 +1319,7 @@ process_operation (Slapi_PBlock *pb, const CSN *csn) > ruv = (RUV*)object_get_data (ruv_obj); > PR_ASSERT (ruv); > >- rc = ruv_add_csn_inprogress (ruv, csn); >+ rc = ruv_add_csn_inprogress (ruv, csn); > > object_release (ruv_obj); > object_release (r_obj); >diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c >index db8d5be..6dbd65d 100644 >--- a/ldap/servers/plugins/replication/repl5_replica.c >+++ b/ldap/servers/plugins/replication/repl5_replica.c >@@ -64,20 +64,20 @@ int g_get_shutdown(); > */ > struct replica { > Slapi_DN *repl_root; /* top of the replicated area */ >- char *repl_name; /* unique replica name */ >- PRBool new_name; /* new name was generated - need to be saved */ >+ char *repl_name; /* unique replica name */ >+ PRBool new_name; /* new name was generated - need to be saved */ > ReplicaUpdateDNList updatedn_list; /* list of dns with which a supplier should bind > to update this replica */ > ReplicaType repl_type; /* is this replica read-only ? */ >- PRBool legacy_consumer; /* if true, this replica is supplied by 4.0 consumer */ >- char* legacy_purl; /* partial url of the legacy supplier */ >+ PRBool legacy_consumer; /* if true, this replica is supplied by 4.0 consumer */ >+ char* legacy_purl; /* partial url of the legacy supplier */ > ReplicaId repl_rid; /* replicaID */ > Object *repl_ruv; /* replica update vector */ > PRBool repl_ruv_dirty; /* Dirty flag for ruv */ > CSNPL *min_csn_pl; /* Pending list for minimal CSN */ > void *csn_pl_reg_id; /* registration assignment for csn callbacks */ > unsigned long repl_state_flags; /* state flags */ >- PRUint32 repl_flags; /* persistent, externally visible flags */ >+ PRUint32 repl_flags; /* persistent, externally visible flags */ > PRLock *repl_lock; /* protects entire structure */ > Slapi_Eq_Context repl_eqcxt_rs; /* context to cancel event that saves ruv */ > Slapi_Eq_Context repl_eqcxt_tr; /* context to cancel event that reaps tombstones */ >@@ -88,9 +88,10 @@ struct replica { > PRBool tombstone_reap_active; /* TRUE when the tombstone reaper is running */ > long tombstone_reap_interval; /* Time in seconds between tombstone reaping */ > Slapi_ValueSet *repl_referral; /* A list of administrator provided referral URLs */ >- PRBool state_update_inprogress; /* replica state is being updated */ >- PRLock *agmt_lock; /* protects agreement creation, start and stop */ >+ PRBool state_update_inprogress; /* replica state is being updated */ >+ PRLock *agmt_lock; /* protects agreement creation, start and stop */ > char *locking_purl; /* supplier who has exclusive access */ >+ char *repl_cleanruv_data[CLEANRIDSIZ + 1]; > }; > > >@@ -126,6 +127,7 @@ static int replica_log_ruv_elements_nolock (const Replica *r); > static void replica_replace_ruv_tombstone(Replica *r); > static void start_agreements_for_replica (Replica *r, PRBool start); > static void _delete_tombstone(const char *tombstone_dn, const char *uniqueid, int ext_op_flags); >+static void replica_strip_cleaned_rids(Replica *r); > > /* Allocates new replica and reads its state and state of its component from > * various parts of the DIT. >@@ -276,6 +278,8 @@ replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation > escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); > } > >+ replica_check_for_tasks(r, e); >+ > done: > if (rc != 0 && r) > { >@@ -312,6 +316,7 @@ replica_destroy(void **arg) > { > Replica *r; > void *repl_name; >+ int i; > > if (arg == NULL) > return; >@@ -398,6 +403,10 @@ replica_destroy(void **arg) > csnplFree(&r->min_csn_pl);; > } > >+ for(i = 0;r->repl_cleanruv_data[i] != NULL; i++){ >+ slapi_ch_free_string(&r->repl_cleanruv_data[i]); >+ } >+ > slapi_ch_free((void **)arg); > } > >@@ -1311,7 +1320,7 @@ replica_reload_ruv (Replica *r) > } > > /* check if there is a changelog and whether this replica logs changes */ >- if (cl5GetState () == CL5_STATE_OPEN && r->repl_flags & REPLICA_LOG_CHANGES) >+ if (cl5GetState () == CL5_STATE_OPEN && (r->repl_flags & REPLICA_LOG_CHANGES)) > { > > /* Compare new ruv to the changelog's upper bound ruv. We could only keep >@@ -1435,7 +1444,7 @@ int replica_check_for_data_reload (Replica *r, void *arg) > PR_ASSERT (r); > > /* check that we have a changelog and if this replica logs changes */ >- if (cl5GetState () == CL5_STATE_OPEN && r->repl_flags & REPLICA_LOG_CHANGES) >+ if (cl5GetState () == CL5_STATE_OPEN && (r->repl_flags & REPLICA_LOG_CHANGES)) > { > /* Compare new ruv to the purge ruv. If the new contains csns which > are smaller than those in purge ruv, we need to remove old and >@@ -1570,6 +1579,12 @@ _replica_get_config_entry (const Slapi_DN *root) > return e; > } > >+char * >+replica_get_dn(Replica *r) >+{ >+ return _replica_get_config_dn (r->repl_root); >+} >+ > static int > _replica_check_validity (const Replica *r) > { >@@ -1801,6 +1816,195 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext) > return (_replica_check_validity (r)); > } > >+void >+replica_check_for_tasks(Replica *r, Slapi_Entry *e) >+{ >+ char **clean_vals; >+ >+ if(e == NULL){ >+ return; >+ } >+ /* >+ * check if we are in the middle of a CLEANALLRUV task, >+ * if so set the cleaned rid, and fire off the thread >+ */ >+ if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaCleanRUV)) != NULL) >+ { >+ PRThread *thread = NULL; >+ struct berval *payload = NULL; >+ CSN *maxcsn = NULL; >+ char *csnpart; >+ char *iter; >+ char csnstr[CSN_STRSIZE]; >+ char *ridstr; >+ char *token = NULL; >+ ReplicaId rid; >+ int i; >+ >+ for(i = 0; i < CLEANRIDSIZ && clean_vals[i]; i++){ >+ cleanruv_data *data = NULL; >+ >+ /* >+ * Set the cleanruv data, and add the cleaned rid >+ */ >+ r->repl_cleanruv_data[i] = slapi_ch_strdup(clean_vals[i]); >+ token = ldap_utf8strtok_r(clean_vals[i], ":", &iter); >+ if(token){ >+ rid = atoi(token); >+ if(rid <= 0 || rid >= READ_ONLY_REPLICA_ID){ >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "CleanAllRUV Task: invalid replica id(%d) " >+ "aborting task.\n", rid); >+ goto done; >+ } >+ } else { >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "CleanAllRUV Task: unable to parse cleanallruv " >+ "data (%s), aborting task.\n",clean_vals[i]); >+ goto done; >+ } >+ csnpart = ldap_utf8strtok_r(iter, ":", &iter); >+ maxcsn = csn_new(); >+ csn_init_by_string(maxcsn, csnpart); >+ csn_as_string(maxcsn, PR_FALSE, csnstr); >+ add_cleaned_rid(rid, r, csnstr); >+ >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "CleanAllRUV Task: cleanAllRUV task found, " >+ "resuming the cleaning of rid(%d)...\n", rid); >+ /* >+ * Create payload >+ */ >+ ridstr = slapi_ch_smprintf("%d:%s:%s", rid, slapi_sdn_get_dn(replica_get_root(r)), csnstr); >+ payload = create_ruv_payload(ridstr); >+ slapi_ch_free_string(&ridstr); >+ >+ if(payload == NULL){ >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "CleanAllRUV Task: Startup: Failed to " >+ "create extended op payload, aborting task"); >+ csn_free(&maxcsn); >+ goto done; >+ } >+ /* >+ * Setup the data struct, and fire off the thread. >+ */ >+ data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); >+ if (data == NULL) { >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV: failed to allocate cleanruv_data.\n"); >+ csn_free(&maxcsn); >+ } else { >+ /* setup our data */ >+ data->repl_obj = NULL; >+ data->replica = NULL; >+ data->rid = rid; >+ data->task = NULL; >+ data->maxcsn = maxcsn; >+ data->sdn = slapi_sdn_dup(r->repl_root); >+ data->payload = payload; >+ >+ thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread_ext, >+ (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); >+ if (thread == NULL) { >+ /* log an error and free everything */ >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV: unable to create cleanAllRUV " >+ "thread for rid(%d)\n", (int)data->rid); >+ csn_free(&maxcsn); >+ slapi_sdn_free(&data->sdn); >+ ber_bvfree(data->payload); >+ slapi_ch_free((void **)&data); >+ } >+ } >+ } >+ r->repl_cleanruv_data[i] = NULL; >+ >+done: >+ slapi_ch_array_free(clean_vals); >+ } >+ >+ if ((clean_vals = slapi_entry_attr_get_charray(e, type_replicaAbortCleanRUV)) != NULL) >+ { >+ PRThread *thread = NULL; >+ struct berval *payload; >+ CSN *maxcsn = NULL; >+ char *iter; >+ char *ridstr = NULL; >+ char *repl_root; >+ char *token = NULL; >+ char *certify = NULL; >+ ReplicaId rid; >+ int i; >+ >+ for(i = 0; clean_vals[i]; i++){ >+ cleanruv_data *data = NULL; >+ >+ token = ldap_utf8strtok_r(clean_vals[i], ":", &iter); >+ if(token){ >+ rid = atoi(token); >+ if(rid <= 0 || rid >= READ_ONLY_REPLICA_ID){ >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: invalid replica id(%d) " >+ "aborting task.\n", rid); >+ goto done2; >+ } >+ } else { >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: unable to parse cleanallruv " >+ "data (%s), aborting task.\n",clean_vals[i]); >+ goto done2; >+ } >+ >+ repl_root = ldap_utf8strtok_r(iter, ":", &iter); >+ certify = ldap_utf8strtok_r(iter, ":", &iter); >+ stop_ruv_cleaning(); >+ maxcsn = replica_get_cleanruv_maxcsn(r, rid); >+ delete_cleaned_rid(r, rid, maxcsn); >+ csn_free(&maxcsn); >+ >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: abort task found, " >+ "resuming abort of rid(%d).\n", rid); >+ /* >+ * Setup the data struct, and fire off the abort thread. >+ */ >+ data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); >+ if (data == NULL) { >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: failed to allocate cleanruv_data.\n"); >+ } else { >+ ridstr = slapi_ch_smprintf("%d:%s", rid, repl_root); >+ payload = create_ruv_payload(ridstr); >+ slapi_ch_free_string(&ridstr); >+ >+ if(payload == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: failed to create extended " >+ "op payload\n"); >+ slapi_ch_free((void **)&data); >+ } else { >+ /* setup the data */ >+ data->repl_obj = NULL; >+ data->replica = NULL; >+ data->rid = rid; >+ data->task = NULL; >+ data->payload = payload; >+ data->repl_root = slapi_ch_strdup(repl_root); >+ data->sdn = slapi_sdn_dup(r->repl_root); >+ data->certify = slapi_ch_strdup(certify); >+ >+ thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread, >+ (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); >+ if (thread == NULL) { >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: unable to create abort cleanAllRUV " >+ "thread for rid(%d)\n", (int)data->rid); >+ slapi_sdn_free(&data->sdn); >+ ber_bvfree(data->payload); >+ slapi_ch_free_string(&data->repl_root); >+ slapi_ch_free_string(&data->certify); >+ slapi_ch_free((void **)&data); >+ } >+ } >+ } >+ } >+ >+done2: >+ slapi_ch_array_free(clean_vals); >+ } >+} >+ > /* This function updates the entry to contain information generated > during replica initialization. > Returns 0 if successful and -1 otherwise */ >@@ -2807,9 +3011,9 @@ static const char *root_glue = > static int > replica_create_ruv_tombstone(Replica *r) > { >- int return_value = LDAP_LOCAL_ERROR; >- char *root_entry_str; >- Slapi_Entry *e = NULL; >+ int return_value = LDAP_LOCAL_ERROR; >+ char *root_entry_str; >+ Slapi_Entry *e = NULL; > const char *purl = NULL; > RUV *ruv; > struct berval **bvals = NULL; >@@ -2817,17 +3021,16 @@ replica_create_ruv_tombstone(Replica *r) > int rc; > char ebuf[BUFSIZ]; > >- PR_ASSERT(NULL != r && NULL != r->repl_root); >- root_entry_str = slapi_ch_smprintf(root_glue, slapi_sdn_get_ndn(r->repl_root), >- RUV_STORAGE_ENTRY_UNIQUEID); >+ PR_ASSERT(NULL != r && NULL != r->repl_root); >+ >+ root_entry_str = slapi_ch_smprintf(root_glue, slapi_sdn_get_ndn(r->repl_root), RUV_STORAGE_ENTRY_UNIQUEID); > >- e = slapi_str2entry(root_entry_str, SLAPI_STR2ENTRY_TOMBSTONE_CHECK); >+ e = slapi_str2entry(root_entry_str, SLAPI_STR2ENTRY_TOMBSTONE_CHECK); > if (e == NULL) > goto done; > > /* Add ruv */ >- if (r->repl_ruv == NULL) >- { >+ if (r->repl_ruv == NULL){ > CSNGen *gen; > CSN *csn; > char csnstr [CSN_STRSIZE]; >@@ -2836,42 +3039,34 @@ replica_create_ruv_tombstone(Replica *r) > gen = (CSNGen *)object_get_data(r->repl_csngen); > PR_ASSERT (gen); > >- if (csngen_new_csn(gen, &csn, PR_FALSE /* notify */) == CSN_SUCCESS) >- { >- (void)csn_as_string(csn, PR_FALSE, csnstr); >- csn_free(&csn); >+ if (csngen_new_csn(gen, &csn, PR_FALSE /* notify */) == CSN_SUCCESS){ >+ (void)csn_as_string(csn, PR_FALSE, csnstr); >+ csn_free(&csn); > >- /* if this is an updateable replica - add its own >- element to the RUV so that referrals work correctly */ >- if (r->repl_type == REPLICA_TYPE_UPDATABLE) >- purl = multimaster_get_local_purl(); >+ /* >+ * if this is an updateable replica - add its own >+ * element to the RUV so that referrals work correctly >+ */ >+ if (r->repl_type == REPLICA_TYPE_UPDATABLE) >+ purl = multimaster_get_local_purl(); > >- if (ruv_init_new(csnstr, r->repl_rid, purl, &ruv) == RUV_SUCCESS) >- { >- r->repl_ruv = object_new((void*)ruv, (FNFree)ruv_destroy); >- r->repl_ruv_dirty = PR_TRUE; >- return_value = LDAP_SUCCESS; >- } >- else >- { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "Cannot create new replica update vector for %s\n", >- escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); >- ruv_destroy(&ruv); >+ if (ruv_init_new(csnstr, r->repl_rid, purl, &ruv) == RUV_SUCCESS){ >+ r->repl_ruv = object_new((void*)ruv, (FNFree)ruv_destroy); >+ r->repl_ruv_dirty = PR_TRUE; >+ return_value = LDAP_SUCCESS; >+ } else { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Cannot create new replica update vector for %s\n", >+ slapi_sdn_get_dn(r->repl_root)); >+ ruv_destroy(&ruv); > goto done; >- } >- } >- else >- { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "Cannot obtain CSN for new replica update vector for %s\n", >- escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); >- csn_free(&csn); >+ } >+ } else { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Cannot obtain CSN for new replica update vector for %s\n", >+ slapi_sdn_get_dn(r->repl_root)); >+ csn_free(&csn); > goto done; >- } >- } >- else /* failed to write the entry because DB was not initialized - retry */ >- { >+ } >+ } else { /* failed to write the entry because DB was not initialized - retry */ > ruv = (RUV*) object_get_data (r->repl_ruv); > PR_ASSERT (ruv); > } >@@ -2879,30 +3074,22 @@ replica_create_ruv_tombstone(Replica *r) > PR_ASSERT (r->repl_ruv); > > rc = ruv_to_bervals(ruv, &bvals); >- if (rc != RUV_SUCCESS) >- { >+ if (rc != RUV_SUCCESS){ > goto done; > } > > /* ONREPL this is depricated function but there is currently no better API to use */ > rc = slapi_entry_add_values(e, type_ruvElement, bvals); >- if (rc != 0) >- { >+ if (rc != 0){ > goto done; > } >- > >- pb = slapi_pblock_new(); >- slapi_add_entry_internal_set_pb( >- pb, >- e, >- NULL /* controls */, >- repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), >- OP_FLAG_TOMBSTONE_ENTRY | OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP | >- OP_FLAG_REPL_RUV); >- slapi_add_internal_pb(pb); >- e = NULL; /* add consumes e, upon success or failure */ >- slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value); >+ pb = slapi_pblock_new(); >+ slapi_add_entry_internal_set_pb(pb, e, NULL /* controls */, repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), >+ OP_FLAG_TOMBSTONE_ENTRY | OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP | OP_FLAG_REPL_RUV); >+ slapi_add_internal_pb(pb); >+ e = NULL; /* add consumes e, upon success or failure */ >+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value); > if (return_value == LDAP_SUCCESS) > r->repl_ruv_dirty = PR_FALSE; > >@@ -2915,51 +3102,50 @@ done: > if (pb) > slapi_pblock_destroy(pb); > >- slapi_ch_free((void **) &root_entry_str); >+ slapi_ch_free_string(&root_entry_str); > >- return return_value; >+ return return_value; > } > > > static void > assign_csn_callback(const CSN *csn, void *data) > { >- Replica *r = (Replica *)data; >+ Replica *r = (Replica *)data; > Object *ruv_obj; > RUV *ruv; > >- PR_ASSERT(NULL != csn); >- PR_ASSERT(NULL != r); >+ PR_ASSERT(NULL != csn); >+ PR_ASSERT(NULL != r); > > ruv_obj = replica_get_ruv (r); > PR_ASSERT (ruv_obj); > ruv = (RUV*)object_get_data (ruv_obj); > PR_ASSERT (ruv); > >- PR_Lock(r->repl_lock); >+ PR_Lock(r->repl_lock); > >- r->repl_csn_assigned = PR_TRUE; >+ r->repl_csn_assigned = PR_TRUE; > >- if (NULL != r->min_csn_pl) >- { >- if (csnplInsert(r->min_csn_pl, csn) != 0) >- { >- char ebuf[BUFSIZ]; >- char csn_str[CSN_STRSIZE]; /* For logging only */ >- /* Ack, we can't keep track of min csn. Punt. */ >- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) { >- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "assign_csn_callback: " >- "failed to insert csn %s for replica %s\n", >- csn_as_string(csn, PR_FALSE, csn_str), >- escape_string(slapi_sdn_get_dn(r->repl_root), ebuf)); >- } >- csnplFree(&r->min_csn_pl); >- } >- } >+ if (NULL != r->min_csn_pl) >+ { >+ if (csnplInsert(r->min_csn_pl, csn) != 0) >+ { >+ char csn_str[CSN_STRSIZE]; /* For logging only */ >+ /* Ack, we can't keep track of min csn. Punt. */ >+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) { >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "assign_csn_callback: " >+ "failed to insert csn %s for replica %s\n", >+ csn_as_string(csn, PR_FALSE, csn_str), >+ slapi_sdn_get_dn(r->repl_root)); >+ } >+ csnplFree(&r->min_csn_pl); >+ } >+ } > > ruv_add_csn_inprogress (ruv, csn); > >- PR_Unlock(r->repl_lock); >+ PR_Unlock(r->repl_lock); > > object_release (ruv_obj); > } >@@ -3196,19 +3382,41 @@ replica_set_tombstone_reap_interval (Replica *r, long interval) > PR_Unlock(r->repl_lock); > } > >+static void >+replica_strip_cleaned_rids(Replica *r) >+{ >+ Object *RUVObj; >+ RUV *ruv = NULL; >+ ReplicaId rid[32] = {0}; >+ int i = 0; >+ >+ RUVObj = replica_get_ruv(r); >+ ruv = (RUV*)object_get_data (RUVObj); >+ >+ ruv_get_cleaned_rids(ruv, rid); >+ while(rid[i] != 0){ >+ ruv_delete_replica(ruv, rid[i]); >+ replica_set_ruv_dirty(r); >+ replica_write_ruv(r); >+ i++; >+ } >+ object_release(RUVObj); >+} >+ > /* Update the tombstone entry to reflect the content of the ruv */ > static void > replica_replace_ruv_tombstone(Replica *r) > { > Slapi_PBlock *pb = NULL; >- char *dn; >- int rc; >- > Slapi_Mod smod; > Slapi_Mod smod_last_modified; > LDAPMod *mods [3]; >+ char *dn; >+ int rc; > >- PR_ASSERT(NULL != r && NULL != r->repl_root); >+ PR_ASSERT(NULL != r && NULL != r->repl_root); >+ >+ replica_strip_cleaned_rids(r); > > PR_Lock(r->repl_lock); > >@@ -3217,14 +3425,14 @@ replica_replace_ruv_tombstone(Replica *r) > ruv_last_modified_to_smod ((RUV*)object_get_data(r->repl_ruv), &smod_last_modified); > > dn = _replica_get_config_dn (r->repl_root); >- if (NULL == dn) { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "replica_replace_ruv_tombstone: " >- "failed to get the config dn for %s\n", >- slapi_sdn_get_dn (r->repl_root)); >- PR_Unlock(r->repl_lock); >- goto bail; >- } >+ if (NULL == dn) { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ "replica_replace_ruv_tombstone: " >+ "failed to get the config dn for %s\n", >+ slapi_sdn_get_dn (r->repl_root)); >+ PR_Unlock(r->repl_lock); >+ goto bail; >+ } > mods[0] = (LDAPMod*)slapi_mod_get_ldapmod_byref(&smod); > mods[1] = (LDAPMod*)slapi_mod_get_ldapmod_byref(&smod_last_modified); > >@@ -3248,12 +3456,12 @@ replica_replace_ruv_tombstone(Replica *r) > > if (rc != LDAP_SUCCESS) > { >- if ((rc != LDAP_NO_SUCH_OBJECT) || !replica_is_state_flag_set(r, REPLICA_IN_USE)) >- { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_replace_ruv_tombstone: " >- "failed to update replication update vector for replica %s: LDAP " >- "error - %d\n", (char*)slapi_sdn_get_dn (r->repl_root), rc); >- } >+ if ((rc != LDAP_NO_SUCH_OBJECT) || !replica_is_state_flag_set(r, REPLICA_IN_USE)) >+ { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_replace_ruv_tombstone: " >+ "failed to update replication update vector for replica %s: LDAP " >+ "error - %d\n", (char*)slapi_sdn_get_dn (r->repl_root), rc); >+ } > } > > slapi_ch_free ((void**)&dn); >@@ -3269,14 +3477,18 @@ replica_update_ruv_consumer(Replica *r, RUV *supplier_ruv) > ReplicaId supplier_id = 0; > char *supplier_purl = NULL; > >- if ( ruv_get_first_id_and_purl(supplier_ruv, &supplier_id, &supplier_purl) == RUV_SUCCESS ) >+ if ( ruv_get_first_id_and_purl(supplier_ruv, &supplier_id, &supplier_purl) == RUV_SUCCESS) > { > RUV *local_ruv = NULL; > > PR_Lock(r->repl_lock); > > local_ruv = (RUV*)object_get_data (r->repl_ruv); >- PR_ASSERT (local_ruv); >+ >+ if(is_cleaned_rid(supplier_id) || local_ruv == NULL){ >+ PR_Unlock(r->repl_lock); >+ return; >+ } > > if ( ruv_local_contains_supplier(local_ruv, supplier_id) == 0 ) > { >@@ -3608,3 +3820,69 @@ replica_get_attr ( Slapi_PBlock *pb, const char* type, void *value ) > > return rc; > } >+ >+void >+replica_add_cleanruv_data(Replica *r, char *val) >+{ >+ int i; >+ >+ PR_Lock(r->repl_lock); >+ >+ for (i = 0; i < CLEANRIDSIZ && r->repl_cleanruv_data[i] != NULL; i++); /* goto the end of the list */ >+ if( i < CLEANRIDSIZ){ >+ r->repl_cleanruv_data[i] = slapi_ch_strdup(val); /* append to list */ >+ r->repl_cleanruv_data[i + 1] = 0; >+ } >+ >+ PR_Unlock(r->repl_lock); >+} >+ >+void >+replica_remove_cleanruv_data(Replica *r, char *val) >+{ >+ int i; >+ >+ PR_Lock(r->repl_lock); >+ >+ for(i = 0; i < CLEANRIDSIZ && r->repl_cleanruv_data[i] && strcmp(r->repl_cleanruv_data[i], val) != 0; i++); >+ if( i < CLEANRIDSIZ ){ >+ slapi_ch_free_string(&r->repl_cleanruv_data[i]); >+ for(; i < CLEANRIDSIZ; i++){ >+ /* rewrite entire array */ >+ r->repl_cleanruv_data[i] = r->repl_cleanruv_data[i + 1]; >+ } >+ } >+ >+ PR_Unlock(r->repl_lock); >+} >+ >+CSN * >+replica_get_cleanruv_maxcsn(Replica *r, ReplicaId rid) >+{ >+ CSN *newcsn; >+ char *csnstr; >+ char *token; >+ char *iter; >+ int repl_rid = 0; >+ int i; >+ >+ PR_Lock(r->repl_lock); >+ >+ for(i = 0; i < CLEANRIDSIZ && r->repl_cleanruv_data[i]; i++){ >+ token = ldap_utf8strtok_r(r->repl_cleanruv_data[i], ":", &iter); >+ if(token){ >+ repl_rid = atoi(token); >+ } >+ csnstr = ldap_utf8strtok_r(iter, ":", &iter); >+ if(repl_rid == rid){ >+ newcsn = csn_new(); >+ csn_init_by_string(newcsn, csnstr); >+ PR_Unlock(r->repl_lock); >+ return newcsn; >+ } >+ } >+ >+ PR_Unlock(r->repl_lock); >+ >+ return NULL; >+} >diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c >index e0e11e9..277afce 100644 >--- a/ldap/servers/plugins/replication/repl5_replica_config.c >+++ b/ldap/servers/plugins/replication/repl5_replica_config.c >@@ -2,15 +2,15 @@ > * This Program is free software; you can redistribute it and/or modify it under > * the terms of the GNU General Public License as published by the Free Software > * Foundation; version 2 of the License. >- * >+ * > * This Program is distributed in the hope that it will be useful, but WITHOUT > * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS > * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. >- * >+ * > * You should have received a copy of the GNU General Public License along with > * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple > * Place, Suite 330, Boston, MA 02111-1307 USA. >- * >+ * > * In addition, as a special exception, Red Hat, Inc. gives You the additional > * right to link the code of this Program with code not covered under the GNU > * General Public License ("Non-GPL Code") and to distribute linked combinations >@@ -28,9 +28,9 @@ > * version of the file, but you are not obligated to do so. If you do not wish to > * provide this exception without modification, you must delete this exception > * statement from your version and license this file solely under the GPL without >- * exception. >- * >- * >+ * exception. >+ * >+ * > * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission. > * Copyright (C) 2005 Red Hat, Inc. > * All rights reserved. >@@ -56,9 +56,19 @@ > #define LDIF2CL_TASK "LDIF2CL" > #define CLEANRUV "CLEANRUV" > #define CLEANRUVLEN 8 >+#define CLEANALLRUV "CLEANALLRUV" >+#define CLEANALLRUVLEN 11 > #define REPLICA_RDN "cn=replica" >+#define CLEANALLRUV_ID "CleanAllRUV Task" >+#define ABORT_CLEANALLRUV_ID "Abort CleanAllRUV Task" > > int slapi_log_urp = SLAPI_LOG_REPL; >+static ReplicaId cleaned_rids[CLEANRIDSIZ + 1] = {0}; >+static ReplicaId aborted_rids[CLEANRIDSIZ + 1] = {0}; >+static Slapi_RWLock *rid_lock = NULL; >+static Slapi_RWLock *abort_rid_lock = NULL; >+static PRLock *notify_lock = NULL; >+static PRCondVar *notify_cvar = NULL; > > /* Forward Declartions */ > static int replica_config_add (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg); >@@ -66,7 +76,7 @@ static int replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* > static int replica_config_post_modify (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg); > static int replica_config_delete (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg); > static int replica_config_search (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg); >- >+static int replica_cleanall_ruv_task(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg); > static int replica_config_change_type_and_id (Replica *r, const char *new_type, const char *new_id, char *returntext, int apply_mods); > static int replica_config_change_updatedn (Replica *r, const LDAPMod *mod, char *returntext, int apply_mods); > static int replica_config_change_flags (Replica *r, const char *new_flags, char *returntext, int apply_mods); >@@ -74,10 +84,22 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt > static int replica_execute_cl2ldif_task (Object *r, char *returntext); > static int replica_execute_ldif2cl_task (Object *r, char *returntext); > static int replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext); >+static int replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, Slapi_Task *task, char *returntext); >+static void replica_cleanallruv_thread(void *arg); >+static void replica_send_cleanruv_task(Repl_Agmt *agmt, ReplicaId rid, Slapi_Task *task); >+static int check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task); >+static int check_agmts_are_caught_up(Replica *replica, ReplicaId rid, char *maxcsn, Slapi_Task *task); >+static int replica_cleanallruv_send_extop(Repl_Agmt *ra, ReplicaId rid, Slapi_Task *task, struct berval *payload, int check_result); >+static int replica_cleanallruv_send_abort_extop(Repl_Agmt *ra, Slapi_Task *task, struct berval *payload); >+static int replica_cleanallruv_check_maxcsn(Repl_Agmt *agmt, char *rid_text, char *maxcsn, Slapi_Task *task); >+static int replica_cleanallruv_replica_alive(Repl_Agmt *agmt); >+static int replica_cleanallruv_check_ruv(Repl_Agmt *ra, char *rid_text, Slapi_Task *task); >+static int get_cleanruv_task_count(); >+static int get_abort_cleanruv_task_count(); > static int replica_cleanup_task (Object *r, const char *task_name, char *returntext, int apply_mods); > static int replica_task_done(Replica *replica); >- > static multimaster_mtnode_extension * _replica_config_get_mtnode_ext (const Slapi_Entry *e); >+int g_get_shutdown(); > > /* > * Note: internal add/modify/delete operations should not be run while >@@ -97,29 +119,59 @@ int > replica_config_init() > { > s_configLock = PR_NewLock (); >+ > if (s_configLock == NULL) > { > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_init: " >- "failed to cretate configuration lock; NSPR error - %d\n", >+ "failed to create configuration lock; NSPR error - %d\n", > PR_GetError ()); > return -1; > } >+ rid_lock = slapi_new_rwlock(); >+ if(rid_lock == NULL) >+ { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_init: " >+ "failed to create rid_lock; NSPR error - %d\n", PR_GetError ()); >+ return -1; >+ } >+ abort_rid_lock = slapi_new_rwlock(); >+ if(abort_rid_lock == NULL) >+ { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_init: " >+ "failed to create abort_rid_lock; NSPR error - %d\n", PR_GetError ()); >+ return -1; >+ } >+ if ( ( notify_lock = PR_NewLock()) == NULL ) { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_init: " >+ "failed to create notify lock; NSPR error - %d\n", PR_GetError ()); >+ return -1; >+ } >+ if ( ( notify_cvar = PR_NewCondVar( notify_lock )) == NULL ) { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_init: " >+ "failed to create notify cond var; NSPR error - %d\n", PR_GetError ()); >+ return -1; >+ } > > /* config DSE must be initialized before we get here */ > slapi_config_register_callback(SLAPI_OPERATION_ADD, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, >- CONFIG_FILTER, replica_config_add, NULL); >+ CONFIG_FILTER, replica_config_add, NULL); > slapi_config_register_callback(SLAPI_OPERATION_MODIFY, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, > CONFIG_FILTER, replica_config_modify,NULL); > slapi_config_register_callback(SLAPI_OPERATION_MODRDN, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, > CONFIG_FILTER, dont_allow_that, NULL); > slapi_config_register_callback(SLAPI_OPERATION_DELETE, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, >- CONFIG_FILTER, replica_config_delete,NULL); >+ CONFIG_FILTER, replica_config_delete,NULL); > slapi_config_register_callback(SLAPI_OPERATION_SEARCH, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, >- CONFIG_FILTER, replica_config_search,NULL); >- slapi_config_register_callback(SLAPI_OPERATION_MODIFY, DSE_FLAG_POSTOP, >+ CONFIG_FILTER, replica_config_search,NULL); >+ slapi_config_register_callback(SLAPI_OPERATION_MODIFY, DSE_FLAG_POSTOP, > CONFIG_BASE, LDAP_SCOPE_SUBTREE, > CONFIG_FILTER, replica_config_post_modify, > NULL); >+ >+ /* register the CLEANALLRUV & ABORT task */ >+ slapi_task_register_handler("cleanallruv", replica_cleanall_ruv_task); >+ slapi_task_register_handler("abort cleanallruv", replica_cleanall_ruv_abort); >+ > return 0; > } > >@@ -134,7 +186,7 @@ replica_config_destroy () > > /* config DSE must be initialized before we get here */ > slapi_config_remove_callback(SLAPI_OPERATION_ADD, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, >- CONFIG_FILTER, replica_config_add); >+ CONFIG_FILTER, replica_config_add); > slapi_config_remove_callback(SLAPI_OPERATION_MODIFY, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, > CONFIG_FILTER, replica_config_modify); > slapi_config_remove_callback(SLAPI_OPERATION_MODRDN, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, >@@ -143,26 +195,26 @@ replica_config_destroy () > CONFIG_FILTER, replica_config_delete); > slapi_config_remove_callback(SLAPI_OPERATION_SEARCH, DSE_FLAG_PREOP, CONFIG_BASE, LDAP_SCOPE_SUBTREE, > CONFIG_FILTER, replica_config_search); >- slapi_config_remove_callback(SLAPI_OPERATION_MODIFY, DSE_FLAG_PREOP, >+ slapi_config_remove_callback(SLAPI_OPERATION_MODIFY, DSE_FLAG_PREOP, > CONFIG_BASE, LDAP_SCOPE_SUBTREE, > CONFIG_FILTER, replica_config_post_modify); > } > >-static int >-replica_config_add (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, >+static int >+replica_config_add (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, > int *returncode, char *errorbuf, void *arg) > { > Replica *r = NULL; >- multimaster_mtnode_extension *mtnode_ext; >+ multimaster_mtnode_extension *mtnode_ext; > char *replica_root = (char*)slapi_entry_attr_get_charptr (e, attr_replicaRoot); > char buf [SLAPI_DSE_RETURNTEXT_SIZE]; > char *errortext = errorbuf ? errorbuf : buf; >- >+ > if (errorbuf) > { > errorbuf[0] = '\0'; >- } >- >+ } >+ > *returncode = LDAP_SUCCESS; > > PR_Lock (s_configLock); >@@ -177,16 +229,16 @@ replica_config_add (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, > { > PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, "replica already configured for %s", replica_root); > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_add: %s\n", errortext); >- *returncode = LDAP_UNWILLING_TO_PERFORM; >- goto done; >+ *returncode = LDAP_UNWILLING_TO_PERFORM; >+ goto done; > } > > /* create replica object */ > r = replica_new_from_entry (e, errortext, PR_TRUE /* is a newly added entry */); > if (r == NULL) > { >- *returncode = LDAP_OPERATIONS_ERROR; >- goto done; >+ *returncode = LDAP_OPERATIONS_ERROR; >+ goto done; > } > > /* Set the mapping tree node state, and the referrals from the RUV */ >@@ -212,24 +264,24 @@ done: > > if (*returncode != LDAP_SUCCESS) > { >- if (mtnode_ext->replica) >+ if (mtnode_ext->replica) > object_release (mtnode_ext->replica); > return SLAPI_DSE_CALLBACK_ERROR; > } >- else >+ else > return SLAPI_DSE_CALLBACK_OK; > } > >-static int >-replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* e, >+static int >+replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* e, > int *returncode, char *returntext, void *arg) > { > int rc= 0; > LDAPMod **mods; > int i, apply_mods; >- multimaster_mtnode_extension *mtnode_ext; >+ multimaster_mtnode_extension *mtnode_ext; > Replica *r = NULL; >- char *replica_root = NULL; >+ char *replica_root = NULL; > char buf [SLAPI_DSE_RETURNTEXT_SIZE]; > char *errortext = returntext ? returntext : buf; > char *config_attr, *config_attr_value; >@@ -244,7 +296,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > > /* just let internal operations originated from replication plugin to go through */ > slapi_pblock_get (pb, SLAPI_OPERATION, &op); >- slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity); >+ slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity); > > if (operation_is_flag_set(op, OP_FLAG_INTERNAL) && > (identity == repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION))) >@@ -256,7 +308,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > replica_root = (char*)slapi_entry_attr_get_charptr (e, attr_replicaRoot); > > PR_Lock (s_configLock); >- >+ > mtnode_ext = _replica_config_get_mtnode_ext (e); > PR_ASSERT (mtnode_ext); > >@@ -268,7 +320,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, "replica does not exist for %s", replica_root); > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", > errortext); >- *returncode = LDAP_OPERATIONS_ERROR; >+ *returncode = LDAP_OPERATIONS_ERROR; > goto done; > } > >@@ -279,7 +331,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > for (apply_mods = 0; apply_mods <= 1; apply_mods++) > { > /* we only allow the replica ID and type to be modified together e.g. >- if converting a read only replica to a master or vice versa - >+ if converting a read only replica to a master or vice versa - > we will need to change both the replica ID and the type at the same > time - we must disallow changing the replica ID if the type is not > being changed and vice versa >@@ -296,7 +348,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > break; > > config_attr = (char *) mods[i]->mod_type; >- PR_ASSERT (config_attr); >+ PR_ASSERT (config_attr); > > /* disallow modifications or removal of replica root, > replica name and replica state attributes */ >@@ -305,14 +357,14 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > strcasecmp (config_attr, attr_state) == 0) > { > *returncode = LDAP_UNWILLING_TO_PERFORM; >- PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, "modification of %s attribute is not allowed", >- config_attr); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", >+ PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, "modification of %s attribute is not allowed", >+ config_attr); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", > errortext); > } > /* this is a request to delete an attribute */ >- else if (mods[i]->mod_op & LDAP_MOD_DELETE || mods[i]->mod_bvalues == NULL >- || mods[i]->mod_bvalues[0]->bv_val == NULL) >+ else if ((mods[i]->mod_op & LDAP_MOD_DELETE) || mods[i]->mod_bvalues == NULL >+ || mods[i]->mod_bvalues[0]->bv_val == NULL) > { > /* currently, you can only remove referral, > legacy consumer or bind dn attribute */ >@@ -337,10 +389,9 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > else > { > *returncode = LDAP_UNWILLING_TO_PERFORM; >- PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, "deletion of %s attribute is not allowed", config_attr); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", >- errortext); >- } >+ PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, "deletion of %s attribute is not allowed", config_attr); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", errortext); >+ } > } > else /* modify an attribute */ > { >@@ -348,8 +399,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > > if (strcasecmp (config_attr, attr_replicaBindDn) == 0) > { >- *returncode = replica_config_change_updatedn (r, mods[i], >- errortext, apply_mods); >+ *returncode = replica_config_change_updatedn (r, mods[i], errortext, apply_mods); > } > else if (strcasecmp (config_attr, attr_replicaType) == 0) > { >@@ -359,16 +409,14 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > { > new_repl_id = slapi_ch_strdup(config_attr_value); > } >- else if (strcasecmp (config_attr, attr_flags) == 0) >+ else if (strcasecmp (config_attr, attr_flags) == 0) > { >- *returncode = replica_config_change_flags (r, config_attr_value, >- errortext, apply_mods); >+ *returncode = replica_config_change_flags (r, config_attr_value, errortext, apply_mods); > } >- else if (strcasecmp (config_attr, TASK_ATTR) == 0) >+ else if (strcasecmp (config_attr, TASK_ATTR) == 0) > { >- *returncode = replica_execute_task (mtnode_ext->replica, config_attr_value, >- errortext, apply_mods); >- } >+ *returncode = replica_execute_task (mtnode_ext->replica, config_attr_value, errortext, apply_mods); >+ } > else if (strcasecmp (config_attr, attr_replicaReferral) == 0) > { > if (apply_mods) >@@ -383,14 +431,14 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > if (!replica_is_legacy_consumer (r)) { > consumer5_set_mapping_tree_state_for_replica(r, NULL); > } >- } >+ } > } > else if (strcasecmp (config_attr, type_replicaPurgeDelay) == 0) > { >- if (apply_mods && config_attr_value && config_attr_value[0]) >+ if (apply_mods && config_attr_value && config_attr_value[0]) > { > PRUint32 delay; >- if (isdigit (config_attr_value[0])) >+ if (isdigit (config_attr_value[0])) > { > delay = (unsigned int)atoi(config_attr_value); > replica_set_purge_delay(r, delay); >@@ -401,7 +449,7 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > } > else if (strcasecmp (config_attr, type_replicaTombstonePurgeInterval) == 0) > { >- if (apply_mods && config_attr_value && config_attr_value[0]) >+ if (apply_mods && config_attr_value && config_attr_value[0]) > { > long interval; > interval = atol (config_attr_value); >@@ -431,17 +479,14 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > *returncode = LDAP_UNWILLING_TO_PERFORM; > PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, > "modification of attribute %s is not allowed in replica entry", config_attr); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", >- errortext); >- } >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", errortext); >+ } > } > } > > if (new_repl_id || new_repl_type) > { >- *returncode = replica_config_change_type_and_id(r, new_repl_type, >- new_repl_id, errortext, >- apply_mods); >+ *returncode = replica_config_change_type_and_id(r, new_repl_type, new_repl_id, errortext, apply_mods); > slapi_ch_free_string(&new_repl_id); > slapi_ch_free_string(&new_repl_type); > } >@@ -450,9 +495,9 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* > done: > if (mtnode_ext->replica) > object_release (mtnode_ext->replica); >- >+ > /* slapi_ch_free accepts NULL pointer */ >- slapi_ch_free ((void**)&replica_root); >+ slapi_ch_free_string(&replica_root); > > PR_Unlock (s_configLock); > >@@ -466,9 +511,9 @@ done: > } > } > >-static int >+static int > replica_config_post_modify(Slapi_PBlock *pb, >- Slapi_Entry* entryBefore, >+ Slapi_Entry* entryBefore, > Slapi_Entry* e, > int *returncode, > char *returntext, >@@ -477,9 +522,9 @@ replica_config_post_modify(Slapi_PBlock *pb, > int rc= 0; > LDAPMod **mods; > int i, apply_mods; >- multimaster_mtnode_extension *mtnode_ext; >+ multimaster_mtnode_extension *mtnode_ext; > Replica *r = NULL; >- char *replica_root = NULL; >+ char *replica_root = NULL; > char buf [SLAPI_DSE_RETURNTEXT_SIZE]; > char *errortext = returntext ? returntext : buf; > char *config_attr, *config_attr_value; >@@ -495,7 +540,7 @@ replica_config_post_modify(Slapi_PBlock *pb, > > /* just let internal operations originated from replication plugin to go through */ > slapi_pblock_get (pb, SLAPI_OPERATION, &op); >- slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity); >+ slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity); > > if (operation_is_flag_set(op, OP_FLAG_INTERNAL) && > (identity == repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION))) >@@ -507,7 +552,7 @@ replica_config_post_modify(Slapi_PBlock *pb, > replica_root = (char*)slapi_entry_attr_get_charptr (e, attr_replicaRoot); > > PR_Lock (s_configLock); >- >+ > mtnode_ext = _replica_config_get_mtnode_ext (e); > PR_ASSERT (mtnode_ext); > >@@ -521,7 +566,7 @@ replica_config_post_modify(Slapi_PBlock *pb, > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "replica_config_post_modify: %s\n", > errortext); >- *returncode = LDAP_OPERATIONS_ERROR; >+ *returncode = LDAP_OPERATIONS_ERROR; > goto done; > } > >@@ -532,7 +577,7 @@ replica_config_post_modify(Slapi_PBlock *pb, > for (apply_mods = 0; apply_mods <= 1; apply_mods++) > { > /* we only allow the replica ID and type to be modified together e.g. >- if converting a read only replica to a master or vice versa - >+ if converting a read only replica to a master or vice versa - > we will need to change both the replica ID and the type at the same > time - we must disallow changing the replica ID if the type is not > being changed and vice versa >@@ -546,7 +591,7 @@ replica_config_post_modify(Slapi_PBlock *pb, > break; > > config_attr = (char *) mods[i]->mod_type; >- PR_ASSERT (config_attr); >+ PR_ASSERT (config_attr); > > /* disallow modifications or removal of replica root, > replica name and replica state attributes */ >@@ -556,16 +601,16 @@ replica_config_post_modify(Slapi_PBlock *pb, > { > *returncode = LDAP_UNWILLING_TO_PERFORM; > PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, >- "modification of %s attribute is not allowed", >- config_attr); >+ "modification of %s attribute is not allowed", >+ config_attr); > slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "replica_config_post_modify: %s\n", >+ "replica_config_post_modify: %s\n", > errortext); > } > /* this is a request to delete an attribute */ >- else if (mods[i]->mod_op & LDAP_MOD_DELETE || >+ else if ((mods[i]->mod_op & LDAP_MOD_DELETE) || > mods[i]->mod_bvalues == NULL || >- mods[i]->mod_bvalues[0]->bv_val == NULL) >+ mods[i]->mod_bvalues[0]->bv_val == NULL) > { > ; > } >@@ -573,7 +618,7 @@ replica_config_post_modify(Slapi_PBlock *pb, > { > config_attr_value = (char *) mods[i]->mod_bvalues[0]->bv_val; > >- if (strcasecmp (config_attr, TASK_ATTR) == 0) >+ if (strcasecmp (config_attr, TASK_ATTR) == 0) > { > flag_need_cleanup = 1; > } >@@ -597,7 +642,7 @@ done: > > if (mtnode_ext->replica) > object_release (mtnode_ext->replica); >- >+ > if (*returncode != LDAP_SUCCESS) > { > return SLAPI_DSE_CALLBACK_ERROR; >@@ -608,8 +653,8 @@ done: > } > } > >-static int >-replica_config_delete (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, >+static int >+replica_config_delete (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, > int *returncode, char *returntext, void *arg) > { > multimaster_mtnode_extension *mtnode_ext; >@@ -644,26 +689,26 @@ replica_config_delete (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter > return SLAPI_DSE_CALLBACK_OK; > } > >-static int >-replica_config_search (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, >+static int >+replica_config_search (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, > char *returntext, void *arg) > { >- multimaster_mtnode_extension *mtnode_ext; >+ multimaster_mtnode_extension *mtnode_ext; > int changeCount = 0; > PRBool reapActive = PR_FALSE; > char val [64]; > > /* add attribute that contains number of entries in the changelog for this replica */ >- >+ > PR_Lock (s_configLock); >- >+ > mtnode_ext = _replica_config_get_mtnode_ext (e); > PR_ASSERT (mtnode_ext); >- >+ > if (mtnode_ext->replica) { > Replica *replica; > object_acquire (mtnode_ext->replica); >- if (cl5GetState () == CL5_STATE_OPEN) { >+ if (cl5GetState () == CL5_STATE_OPEN) { > changeCount = cl5GetOperationCount (mtnode_ext->replica); > } > replica = (Replica*)object_get_data (mtnode_ext->replica); >@@ -682,9 +727,9 @@ replica_config_search (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter > return SLAPI_DSE_CALLBACK_OK; > } > >-static int >+static int > replica_config_change_type_and_id (Replica *r, const char *new_type, >- const char *new_id, char *returntext, >+ const char *new_id, char *returntext, > int apply_mods) > { > int type; >@@ -764,8 +809,8 @@ replica_config_change_type_and_id (Replica *r, const char *new_type, > return LDAP_SUCCESS; > } > >-static int >-replica_config_change_updatedn (Replica *r, const LDAPMod *mod, char *returntext, >+static int >+replica_config_change_updatedn (Replica *r, const LDAPMod *mod, char *returntext, > int apply_mods) > { > PR_ASSERT (r); >@@ -784,7 +829,7 @@ replica_config_change_updatedn (Replica *r, const LDAPMod *mod, char *returntext > return LDAP_SUCCESS; > } > >-static int replica_config_change_flags (Replica *r, const char *new_flags, >+static int replica_config_change_flags (Replica *r, const char *new_flags, > char *returntext, int apply_mods) > { > PR_ASSERT (r); >@@ -801,10 +846,10 @@ static int replica_config_change_flags (Replica *r, const char *new_flags, > return LDAP_SUCCESS; > } > >-static int replica_execute_task (Object *r, const char *task_name, char *returntext, >+static int replica_execute_task (Object *r, const char *task_name, char *returntext, > int apply_mods) > { >- >+ > if (strcasecmp (task_name, CL2LDIF_TASK) == 0) > { > if (apply_mods) >@@ -827,10 +872,8 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt > { > int temprid = atoi(&(task_name[CLEANRUVLEN])); > if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){ >- PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, >- "Invalid replica id for task - %s", task_name); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "replica_execute_task: %s\n", returntext); >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for task - %s", temprid, task_name); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_execute_task: %s\n", returntext); > return LDAP_OPERATIONS_ERROR; > } > if (apply_mods) >@@ -840,25 +883,41 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt > else > return LDAP_SUCCESS; > } >+ else if (strncasecmp (task_name, CLEANALLRUV, CLEANALLRUVLEN) == 0) >+ { >+ int temprid = atoi(&(task_name[CLEANALLRUVLEN])); >+ if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for task - (%s)", temprid, task_name); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_execute_task: %s\n", returntext); >+ return LDAP_OPERATIONS_ERROR; >+ } >+ if (apply_mods) >+ { >+ Slapi_Task *empty_task = NULL; >+ return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, empty_task, returntext); >+ } >+ else >+ return LDAP_SUCCESS; >+ } > else > { > PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "unsupported replica task - %s", task_name); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "replica_execute_task: %s\n", returntext); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ "replica_execute_task: %s\n", returntext); > return LDAP_OPERATIONS_ERROR; > } >- >+ > } > >-static int >-replica_cleanup_task (Object *r, const char *task_name, char *returntext, >+static int >+replica_cleanup_task (Object *r, const char *task_name, char *returntext, > int apply_mods) > { > int rc = LDAP_SUCCESS; > if (apply_mods) { > Replica *replica = (Replica*)object_get_data (r); > if (NULL == replica) { >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > } else { > rc = replica_task_done(replica); > } >@@ -894,19 +953,19 @@ replica_task_done(Replica *replica) > mod.mod_type = (char *)TASK_ATTR; > mod.mod_bvalues = NULL; > >- slapi_modify_internal_set_pb_ext(pb, replica_sdn, mods, NULL/* controls */, >- NULL/* uniqueid */, >+ slapi_modify_internal_set_pb_ext(pb, replica_sdn, mods, NULL/* controls */, >+ NULL/* uniqueid */, > repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), > 0/* flags */); > slapi_modify_internal_pb (pb); > > slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); > if ((rc != LDAP_SUCCESS) && (rc != LDAP_NO_SUCH_ATTRIBUTE)) { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "replica_task_done: " > "failed to remove (%s) attribute from (%s) entry; " > "LDAP error - %d\n", >- TASK_ATTR, replica_dn, rc); >+ TASK_ATTR, replica_dn, rc); > } > > slapi_pblock_destroy (pb); >@@ -926,9 +985,9 @@ static int replica_execute_cl2ldif_task (Object *r, char *returntext) > if (cl5GetState () != CL5_STATE_OPEN) > { > PR_snprintf (returntext, SLAPI_DSE_RETURNTEXT_SIZE, "changelog is not open"); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "replica_execute_cl2ldif_task: %s\n", returntext); >- rc = LDAP_OPERATIONS_ERROR; >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ "replica_execute_cl2ldif_task: %s\n", returntext); >+ rc = LDAP_OPERATIONS_ERROR; > goto bail; > } > >@@ -939,25 +998,25 @@ static int replica_execute_cl2ldif_task (Object *r, char *returntext) > <replica name>.ldif */ > clDir = cl5GetDir (); > if (NULL == clDir) { >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > goto bail; > } > > replica = (Replica*)object_get_data (r); > if (NULL == replica) { >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > goto bail; > } > > PR_snprintf (fName, MAXPATHLEN, "%s/%s.ldif", clDir, replica_get_name (replica)); > slapi_ch_free_string (&clDir); > >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "Beginning changelog export of replica \"%s\"\n", > replica_get_name(replica)); > rc = cl5ExportLDIF (fName, rlist); > if (rc == CL5_SUCCESS) { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "Finished changelog export of replica \"%s\"\n", > replica_get_name(replica)); > rc = LDAP_SUCCESS; >@@ -965,9 +1024,9 @@ static int replica_execute_cl2ldif_task (Object *r, char *returntext) > PR_snprintf (returntext, SLAPI_DSE_RETURNTEXT_SIZE, > "Failed changelog export replica %s; " > "changelog error - %d", replica_get_name(replica), rc); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "replica_execute_cl2ldif_task: %s\n", returntext); >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > } > bail: > return rc; >@@ -985,8 +1044,8 @@ static int replica_execute_ldif2cl_task (Object *r, char *returntext) > if (cl5GetState () != CL5_STATE_OPEN) > { > PR_snprintf (returntext, SLAPI_DSE_RETURNTEXT_SIZE, "changelog is not open"); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >- "replica_execute_ldif2cl_task: %s\n", returntext); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ "replica_execute_ldif2cl_task: %s\n", returntext); > rc = LDAP_OPERATIONS_ERROR; > goto bail; > } >@@ -998,13 +1057,13 @@ static int replica_execute_ldif2cl_task (Object *r, char *returntext) > <replica name>.ldif */ > clDir = cl5GetDir (); > if (NULL == clDir) { >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > goto bail; > } > > replica = (Replica*)object_get_data (r); > if (NULL == replica) { >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > goto bail; > } > >@@ -1016,19 +1075,19 @@ static int replica_execute_ldif2cl_task (Object *r, char *returntext) > PR_snprintf (returntext, SLAPI_DSE_RETURNTEXT_SIZE, > "failed to close changelog to import changelog data; " > "changelog error - %d", rc); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "replica_execute_ldif2cl_task: %s\n", returntext); > rc = LDAP_OPERATIONS_ERROR; > goto bail; > } >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "Beginning changelog import of replica \"%s\"\n", > replica_get_name(replica)); > imprc = cl5ImportLDIF (clDir, fName, rlist); > slapi_ch_free_string (&clDir); > if (CL5_SUCCESS == imprc) > { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "Finished changelog import of replica \"%s\"\n", > replica_get_name(replica)); > } >@@ -1037,7 +1096,7 @@ static int replica_execute_ldif2cl_task (Object *r, char *returntext) > PR_snprintf (returntext, SLAPI_DSE_RETURNTEXT_SIZE, > "Failed changelog import replica %s; " > "changelog error - %d", replica_get_name(replica), rc); >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "replica_execute_ldif2cl_task: %s\n", returntext); > imprc = LDAP_OPERATIONS_ERROR; > } >@@ -1050,10 +1109,10 @@ static int replica_execute_ldif2cl_task (Object *r, char *returntext) > } > else > { >- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, > "replica_execute_ldif2cl_task: failed to start changelog at %s\n", > config.dir?config.dir:"null config dir"); >- rc = LDAP_OPERATIONS_ERROR; >+ rc = LDAP_OPERATIONS_ERROR; > } > bail: > changelog5_config_done(&config); >@@ -1061,7 +1120,7 @@ bail: > return imprc?imprc:rc; > } > >-static multimaster_mtnode_extension * >+static multimaster_mtnode_extension * > _replica_config_get_mtnode_ext (const Slapi_Entry *e) > { > const char *replica_root; >@@ -1094,29 +1153,35 @@ _replica_config_get_mtnode_ext (const Slapi_Entry *e) > else > { > /* check if replica object already exists for the specified subtree */ >- ext = (multimaster_mtnode_extension *)repl_con_get_ext (REPL_CON_EXT_MTNODE, mtnode); >+ ext = (multimaster_mtnode_extension *)repl_con_get_ext (REPL_CON_EXT_MTNODE, mtnode); > } >- >+ > slapi_sdn_free (&sdn); > > return ext; > } > >+int >+replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid) >+{ >+ return replica_execute_cleanruv_task(r, rid, NULL); >+} >+ > static int >-replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext) >+replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not used */) > { >- int rc = 0; > Object *RUVObj; > RUV *local_ruv = NULL; >- Replica *replica = (Replica*)object_get_data (r); >- >- PR_ASSERT (replica); >+ Replica *replica = (Replica*)object_get_data (r); >+ int rc = 0; >+ PR_ASSERT (replica); > >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_task: cleaning rid (%d)...\n",(int)rid); > RUVObj = replica_get_ruv(replica); > PR_ASSERT(RUVObj); > local_ruv = (RUV*)object_get_data (RUVObj); >- /* Need to check that : >- * - rid is not the local one >+ /* Need to check that : >+ * - rid is not the local one > * - rid is not the last one > */ > if ((replica_get_rid(replica) == rid) || >@@ -1130,11 +1195,1634 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext) > > /* Update Mapping Tree to reflect RUV changes */ > consumer5_set_mapping_tree_state_for_replica(replica, NULL); >- >+ >+ /* >+ * Clean the changelog RUV's >+ */ >+ cl5CleanRUV(rid); >+ > if (rc != RUV_SUCCESS){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc); > return LDAP_OPERATIONS_ERROR; > } >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_task: finished successfully\n"); > return LDAP_SUCCESS; > } > >+const char * >+fetch_attr(Slapi_Entry *e, const char *attrname, const char *default_val) >+{ >+ Slapi_Attr *attr; >+ Slapi_Value *val = NULL; >+ >+ if (slapi_entry_attr_find(e, attrname, &attr) != 0) >+ return default_val; >+ >+ slapi_attr_first_value(attr, &val); >+ return slapi_value_get_string(val); >+} >+ >+static int >+replica_cleanall_ruv_task(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, >+ int *returncode, char *returntext, void *arg) >+{ >+ Slapi_Task *task = NULL; >+ const Slapi_DN *task_dn; >+ Slapi_DN *dn = NULL; >+ Object *r; >+ const char *base_dn; >+ const char *rid_str; >+ ReplicaId rid; >+ int rc = SLAPI_DSE_CALLBACK_OK; >+ >+ /* allocate new task now */ >+ task = slapi_new_task(slapi_entry_get_ndn(e)); >+ if(task == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Failed to create new task\n"); >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ >+ /* >+ * Get our task settings >+ */ >+ if ((base_dn = fetch_attr(e, "replica-base-dn", 0)) == NULL){ >+ *returncode = LDAP_OBJECT_CLASS_VIOLATION; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ if ((rid_str = fetch_attr(e, "replica-id", 0)) == NULL){ >+ *returncode = LDAP_OBJECT_CLASS_VIOLATION; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ >+ task_dn = slapi_entry_get_sdn(e); >+ /* >+ * Check the rid >+ */ >+ rid = atoi(rid_str); >+ if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for task - (%s)", >+ rid, slapi_sdn_get_dn(task_dn)); >+ cleanruv_log(task, CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ /* >+ * Get the replica object >+ */ >+ dn = slapi_sdn_new_dn_byval(base_dn); >+ if((r = replica_get_replica_from_dn(dn)) == NULL){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Could not find replica from dn(%s)",slapi_sdn_get_dn(dn)); >+ cleanruv_log(task, CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ >+ /* clean the RUV's */ >+ rc = replica_execute_cleanall_ruv_task (r, rid, task, returntext); >+ >+out: >+ if(rc){ >+ cleanruv_log(task, CLEANALLRUV_ID, "Task failed...(%d)", rc); >+ slapi_task_finish(task, *returncode); >+ } else { >+ rc = SLAPI_DSE_CALLBACK_OK; >+ } >+ slapi_sdn_free(&dn); >+ >+ return rc; >+} >+ >+/* >+ * CLEANALLRUV task >+ * >+ * [1] Get the maxcsn from the RUV of the rid we want to clean >+ * [2] Create the payload for the "cleanallruv" extended ops >+ * [3] Create "monitor" thread to do the real work. >+ * >+ */ >+static int >+replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, Slapi_Task *task, char *returntext) >+{ >+ PRThread *thread = NULL; >+ Slapi_Task *pre_task = NULL; /* this is supposed to be null for logging */ >+ Replica *replica; >+ Object *ruv_obj; >+ cleanruv_data *data = NULL; >+ CSN *maxcsn = NULL; >+ const RUV *ruv; >+ struct berval *payload = NULL; >+ char *ridstr = NULL; >+ char csnstr[CSN_STRSIZE]; >+ int rc = 0; >+ >+ if(get_cleanruv_task_count() >= CLEANRIDSIZ){ >+ /* we are already running the maximum number of tasks */ >+ cleanruv_log(pre_task, CLEANALLRUV_ID, >+ "Exceeded maximum number of active CLEANALLRUV tasks(%d)",CLEANRIDSIZ); >+ return LDAP_UNWILLING_TO_PERFORM; >+ } >+ >+ /* >+ * Grab the replica >+ */ >+ replica = (Replica*)object_get_data (r); >+ /* >+ * Check if this is a consumer >+ */ >+ if(replica_get_type(replica) == REPLICA_TYPE_READONLY){ >+ /* this is a consumer, send error */ >+ cleanruv_log(pre_task, CLEANALLRUV_ID, "Failed to clean rid (%d), task can not be run on a consumer",rid); >+ if(task){ >+ rc = -1; >+ slapi_task_finish(task, rc); >+ } >+ return -1; >+ } >+ /* >+ * Grab the max csn of the deleted replica >+ */ >+ ruv_obj = replica_get_ruv(replica); >+ ruv = object_get_data (ruv_obj); >+ if(ruv_get_rid_max_csn(ruv, &maxcsn, rid) == RUV_BAD_DATA){ >+ /* no maxcsn, can not proceed */ >+ cleanruv_log(pre_task, CLEANALLRUV_ID, "Could not find maxcsn for rid (%d)", rid); >+ rc = -1; >+ object_release(ruv_obj); >+ goto fail; >+ } else { >+ object_release(ruv_obj); >+ if(maxcsn == NULL || csn_get_replicaid(maxcsn) == 0){ >+ /* >+ * This is for consistency with extop csn creation, where >+ * we want the csn string to be "0000000000000000000" not "" >+ */ >+ csn_free(&maxcsn); >+ maxcsn = csn_new(); >+ csn_init_by_string(maxcsn, ""); >+ } >+ csn_as_string(maxcsn, PR_FALSE, csnstr); >+ } >+ /* >+ * Create payload >+ */ >+ ridstr = slapi_ch_smprintf("%d:%s:%s", rid, slapi_sdn_get_dn(replica_get_root(replica)), csnstr); >+ payload = create_ruv_payload(ridstr); >+ slapi_ch_free_string(&ridstr); >+ >+ if(payload == NULL){ >+ cleanruv_log(pre_task, CLEANALLRUV_ID, "Failed to create extended op payload, aborting task"); >+ rc = -1; >+ goto fail; >+ } >+ >+ /* >+ * Launch the cleanallruv thread. Once all the replicas are cleaned it will release the rid >+ */ >+ data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); >+ if (data == NULL) { >+ cleanruv_log(pre_task, CLEANALLRUV_ID, "Failed to allocate cleanruv_data. Aborting task."); >+ rc = -1; >+ goto fail; >+ } >+ data->repl_obj = r; >+ data->replica = replica; >+ data->rid = rid; >+ data->task = task; >+ data->maxcsn = maxcsn; >+ data->payload = payload; >+ data->sdn = NULL; >+ >+ thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread, >+ (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); >+ if (thread == NULL) { >+ rc = -1; >+ goto fail; >+ } else { >+ goto done; >+ } >+ >+fail: >+ cleanruv_log(pre_task, CLEANALLRUV_ID, "Failed to clean rid (%d)",rid); >+ if(task){ >+ slapi_task_finish(task, rc); >+ } >+ if(payload){ >+ ber_bvfree(payload); >+ } >+ csn_free(&maxcsn); >+ if(task) /* only the task acquires the r obj */ >+ object_release (r); >+ >+done: >+ >+ return rc; >+} >+ >+void >+replica_cleanallruv_thread_ext(void *arg) >+{ >+ replica_cleanallruv_thread(arg); >+} >+ >+/* >+ * CLEANALLRUV Thread >+ * >+ * [1] Wait for the maxcsn to be covered >+ * [2] Make sure all the replicas are alive >+ * [3] Set the cleaned rid >+ * [4] Send the cleanAllRUV extop to all the replicas >+ * [5] Manually send the CLEANRUV task to replicas that do not support CLEANALLRUV >+ * [6] Wait for all the replicas to be cleaned. >+ * [7] Trigger cl trimming, release the rid, and remove all the "cleanallruv" attributes >+ * from the config. >+ */ >+static void >+replica_cleanallruv_thread(void *arg) >+{ >+ Object *ruv_obj = NULL; >+ Object *agmt_obj = NULL; >+ Repl_Agmt *agmt = NULL; >+ RUV *ruv = NULL; >+ cleanruv_data *data = arg; >+ char csnstr[CSN_STRSIZE]; >+ char *returntext = NULL; >+ char *rid_text = NULL; >+ int found_dirty_rid = 1; >+ int agmt_not_notified = 1; >+ int interval = 10; >+ int aborted = 0; >+ int free_obj = 0; >+ int rc = 0; >+ >+ /* >+ * Initialize our settings >+ */ >+ if(data->replica == NULL && data->repl_obj == NULL){ >+ /* >+ * This thread was initiated at startup because the process did not finish. Due >+ * to startup timing issues, we need to wait before grabbing the replica obj, as >+ * the backends might not be online yet. >+ */ >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(5) ); >+ PR_Unlock( notify_lock ); >+ data->repl_obj = replica_get_replica_from_dn(data->sdn); >+ if(data->repl_obj == NULL){ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Unable to retrieve repl object from dn(%s).", data->sdn); >+ aborted = 1; >+ goto done; >+ } >+ data->replica = (Replica*)object_get_data(data->repl_obj); >+ free_obj = 1; >+ } else if(data->replica == NULL && data->repl_obj){ >+ data->replica = (Replica*)object_get_data(data->repl_obj); >+ } else if( data->repl_obj == NULL && data->replica){ >+ data->repl_obj = object_new(data->replica, NULL); >+ free_obj = 1; >+ } >+ /* verify we have set our repl objects */ >+ if(data->repl_obj == NULL || data->replica == NULL){ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Unable to set the replica objects."); >+ aborted = 1; >+ goto done; >+ } >+ if(data->task){ >+ slapi_task_begin(data->task, 1); >+ } >+ rid_text = slapi_ch_smprintf("{replica %d ldap", data->rid); >+ csn_as_string(data->maxcsn, PR_FALSE, csnstr); >+ >+ /* >+ * Add the cleanallruv task to the repl config - so we can handle restarts >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Cleaning rid (%d)...", data->rid); >+ add_cleaned_rid(data->rid, data->replica, csnstr); /* marks config that we started cleaning a rid */ >+ /* >+ * First, wait for the maxcsn to be covered >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Waiting to process all the updates from the deleted replica..."); >+ ruv_obj = replica_get_ruv(data->replica); >+ ruv = object_get_data (ruv_obj); >+ while(data->maxcsn && !is_task_aborted(data->rid) && !is_cleaned_rid(data->rid) && !slapi_is_shutting_down()){ >+ if(csn_get_replicaid(data->maxcsn) == 0 || ruv_covers_csn_cleanallruv(ruv,data->maxcsn)){ >+ /* We are caught up, now we can clean the ruv's */ >+ break; >+ } >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(5) ); >+ PR_Unlock( notify_lock ); >+ } >+ object_release(ruv_obj); >+ /* >+ * Next, make sure all the replicas are up and running before sending off the clean ruv tasks >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID,"Waiting for all the replicas to be online..."); >+ if(check_agmts_are_alive(data->replica, data->rid, data->task)){ >+ /* error, aborted or shutdown */ >+ aborted = 1; >+ goto done; >+ } >+ /* >+ * Make sure all the replicas have seen the max csn >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID,"Waiting for all the replicas to receive all the deleted replica updates..."); >+ if(check_agmts_are_caught_up(data->replica, data->rid, csnstr, data->task)){ >+ /* error, aborted or shutdown */ >+ aborted = 1; >+ goto done; >+ } >+ /* >+ * Set the rid as notified - this blocks the changelog from sending out updates >+ * during this process, as well as prevents the db ruv from getting polluted. >+ */ >+ set_cleaned_rid(data->rid); >+ /* >+ * Now send the cleanruv extended op to all the agreements >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Sending cleanAllRUV task to all the replicas..."); >+ while(agmt_not_notified && !is_task_aborted(data->rid) && !slapi_is_shutting_down()){ >+ agmt_obj = agmtlist_get_first_agreement_for_replica (data->replica); >+ if(agmt_obj == NULL){ >+ /* no agmts, just clean this replica */ >+ break; >+ } >+ while (agmt_obj){ >+ agmt = (Repl_Agmt*)object_get_data (agmt_obj); >+ if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ >+ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); >+ agmt_not_notified = 0; >+ continue; >+ } >+ if(replica_cleanallruv_send_extop(agmt, data->rid, data->task, data->payload, 1) == 0){ >+ agmt_not_notified = 0; >+ } else { >+ agmt_not_notified = 1; >+ break; >+ } >+ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); >+ } >+ >+ if(is_task_aborted(data->rid)){ >+ aborted = 1; >+ goto done; >+ } >+ if(agmt_not_notified == 0){ >+ break; >+ } >+ /* >+ * need to sleep between passes >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Not all replicas have received the " >+ "cleanallruv extended op, retrying in %d seconds",interval); >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); >+ PR_Unlock( notify_lock ); >+ >+ if(interval < 14400){ /* 4 hour max */ >+ interval = interval * 2; >+ } else { >+ interval = 14400; >+ } >+ } >+ /* >+ * Run the CLEANRUV task >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID,"Cleaning local ruv's..."); >+ replica_execute_cleanruv_task (data->repl_obj, data->rid, returntext); >+ /* >+ * Wait for all the replicas to be cleaned >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID,"Waiting for all the replicas to be cleaned..."); >+ >+ interval = 10; >+ while(found_dirty_rid && !is_task_aborted(data->rid) && !slapi_is_shutting_down()){ >+ agmt_obj = agmtlist_get_first_agreement_for_replica (data->replica); >+ if(agmt_obj == NULL){ >+ break; >+ } >+ while (agmt_obj && !slapi_is_shutting_down()){ >+ agmt = (Repl_Agmt*)object_get_data (agmt_obj); >+ if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ >+ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); >+ found_dirty_rid = 0; >+ continue; >+ } >+ if(replica_cleanallruv_check_ruv(agmt, rid_text, data->task) == 0){ >+ found_dirty_rid = 0; >+ } else { >+ found_dirty_rid = 1; >+ break; >+ } >+ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); >+ } >+ /* If the task is abort or everyone is cleaned, break out */ >+ if(is_task_aborted(data->rid)){ >+ aborted = 1; >+ goto done; >+ } >+ if(found_dirty_rid == 0){ >+ break; >+ } >+ /* >+ * need to sleep between passes >+ */ >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Replicas have not been cleaned yet, " >+ "retrying in %d seconds", interval); >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); >+ PR_Unlock( notify_lock ); >+ >+ if(interval < 14400){ /* 4 hour max */ >+ interval = interval * 2; >+ } else { >+ interval = 14400; >+ } >+ } /* while */ >+ >+done: >+ /* >+ * If the replicas are cleaned, release the rid, and trim the changelog >+ */ >+ if(!aborted){ >+ trigger_cl_trimming(data->rid); >+ delete_cleaned_rid(data->replica, data->rid, data->maxcsn); >+ cleanruv_log(data->task, CLEANALLRUV_ID, "Successfully cleaned rid(%d).", data->rid); >+ slapi_task_finish(data->task, rc); >+ } else { >+ /* >+ * Shutdown or abort >+ */ >+ if(!is_task_aborted(data->rid)){ >+ cleanruv_log(data->task, CLEANALLRUV_ID,"Server shutting down. Process will resume at server startup"); >+ } else { >+ cleanruv_log(data->task, CLEANALLRUV_ID,"Task aborted for rid(%d).",data->rid); >+ } >+ if(data->task){ >+ slapi_task_finish(data->task, rc); >+ } >+ } >+ >+ if(data->payload){ >+ ber_bvfree(data->payload); >+ } >+ if(data->repl_obj && free_obj){ >+ object_release(data->repl_obj); >+ } >+ slapi_sdn_free(&data->sdn); >+ slapi_ch_free_string(&rid_text); >+ csn_free(&data->maxcsn); >+ slapi_ch_free((void **)&data); >+} >+ >+/* >+ * Waits for all the repl agmts to be have have the maxcsn. Returns error only on abort or shutdown >+ */ >+static int >+check_agmts_are_caught_up(Replica *replica, ReplicaId rid, char *maxcsn, Slapi_Task *task) >+{ >+ Object *agmt_obj; >+ Repl_Agmt *agmt; >+ char *rid_text; >+ int not_all_caughtup = 1; >+ int interval = 10; >+ >+ rid_text = slapi_ch_smprintf("{replica %d ldap", rid); >+ >+ while(not_all_caughtup && !is_task_aborted(rid) && !slapi_is_shutting_down()){ >+ agmt_obj = agmtlist_get_first_agreement_for_replica (replica); >+ if(agmt_obj == NULL){ >+ not_all_caughtup = 0; >+ break; >+ } >+ while (agmt_obj){ >+ agmt = (Repl_Agmt*)object_get_data (agmt_obj); >+ if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ >+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); >+ not_all_caughtup = 0; >+ continue; >+ } >+ if(replica_cleanallruv_check_maxcsn(agmt, rid_text, maxcsn, task) == 0){ >+ not_all_caughtup = 0; >+ } else { >+ not_all_caughtup = 1; >+ break; >+ } >+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); >+ } /* agmt while */ >+ >+ if(not_all_caughtup == 0 || is_task_aborted(rid) ){ >+ break; >+ } >+ cleanruv_log(task, CLEANALLRUV_ID, "Not all replicas caught up, retrying in %d seconds",interval); >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); >+ PR_Unlock( notify_lock ); >+ >+ if(interval < 14400){ /* 4 hour max */ >+ interval = interval * 2; >+ } else { >+ interval = 14400; >+ } >+ } >+ slapi_ch_free_string(&rid_text); >+ >+ if(is_task_aborted(rid)){ >+ return -1; >+ } >+ >+ return not_all_caughtup; >+} >+ >+/* >+ * Waits for all the repl agmts to be online. Returns error only on abort or shutdown >+ */ >+static int >+check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task) >+{ >+ Object *agmt_obj; >+ Repl_Agmt *agmt; >+ int not_all_alive = 1; >+ int interval = 10; >+ >+ while(not_all_alive && is_task_aborted(rid) == 0 && !slapi_is_shutting_down()){ >+ agmt_obj = agmtlist_get_first_agreement_for_replica (replica); >+ if(agmt_obj == NULL){ >+ not_all_alive = 0; >+ break; >+ } >+ while (agmt_obj){ >+ agmt = (Repl_Agmt*)object_get_data (agmt_obj); >+ if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ >+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); >+ not_all_alive = 0; >+ continue; >+ } >+ if(replica_cleanallruv_replica_alive(agmt) == 0){ >+ not_all_alive = 0; >+ } else { >+ not_all_alive = 1; >+ break; >+ } >+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); >+ } >+ >+ if(not_all_alive == 0 || is_task_aborted(rid)){ >+ break; >+ } >+ cleanruv_log(task, CLEANALLRUV_ID, "Not all replicas online, retrying in %d seconds...",interval); >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); >+ PR_Unlock( notify_lock ); >+ >+ if(interval < 14400){ /* 4 hour max */ >+ interval = interval * 2; >+ } else { >+ interval = 14400; >+ } >+ } >+ if(is_task_aborted(rid)){ >+ return -1; >+ } >+ >+ return not_all_alive; >+} >+ >+/* >+ * Create the CLEANALLRUV extended op payload >+ */ >+struct berval * >+create_ruv_payload(char *value) >+{ >+ struct berval *req_data = NULL; >+ BerElement *tmp_bere = NULL; >+ >+ if ((tmp_bere = der_alloc()) == NULL){ >+ goto error; >+ } >+ if (ber_printf(tmp_bere, "{s", value) == -1){ >+ goto error; >+ } >+ if (ber_printf(tmp_bere, "}") == -1){ >+ goto error; >+ } >+ if (ber_flatten(tmp_bere, &req_data) == -1){ >+ goto error; >+ } >+ goto done; >+ >+error: >+ if (NULL != req_data){ >+ ber_bvfree(req_data); >+ req_data = NULL; >+ } >+ >+done: >+ if (NULL != tmp_bere){ >+ ber_free(tmp_bere, 1); >+ tmp_bere = NULL; >+ } >+ >+ return req_data; >+} >+ >+/* >+ * Manually add the CLEANRUV task to replicas that do not support >+ * the CLEANALLRUV task. >+ */ >+static void >+replica_send_cleanruv_task(Repl_Agmt *agmt, ReplicaId rid, Slapi_Task *task) >+{ >+ Repl_Connection *conn; >+ ConnResult crc = 0; >+ LDAP *ld; >+ Slapi_DN *sdn; >+ struct berval *vals[2]; >+ struct berval val; >+ LDAPMod *mods[2]; >+ LDAPMod mod; >+ char *repl_dn = NULL; >+ char data[15]; >+ int rc; >+ >+ if((conn = conn_new(agmt)) == NULL){ >+ return; >+ } >+ crc = conn_connect(conn); >+ if (CONN_OPERATION_SUCCESS != crc){ >+ conn_delete_internal_ext(conn); >+ return; >+ } >+ ld = conn_get_ldap(conn); >+ if(ld == NULL){ >+ conn_delete_internal_ext(conn); >+ return; >+ } >+ val.bv_len = PR_snprintf(data, sizeof(data), "CLEANRUV%d", rid); >+ sdn = agmt_get_replarea(agmt); >+ mod.mod_op = LDAP_MOD_ADD|LDAP_MOD_BVALUES; >+ mod.mod_type = "nsds5task"; >+ mod.mod_bvalues = vals; >+ vals [0] = &val; >+ vals [1] = NULL; >+ val.bv_val = data; >+ mods[0] = &mod; >+ mods[1] = NULL; >+ repl_dn = slapi_create_dn_string("cn=replica,cn=%s,cn=mapping tree,cn=config", slapi_sdn_get_dn(sdn)); >+ /* >+ * Add task to remote replica >+ */ >+ rc = ldap_modify_ext_s( ld, repl_dn, mods, NULL, NULL); >+ >+ if(rc != LDAP_SUCCESS){ >+ cleanruv_log(task, CLEANALLRUV_ID, "Failed to add CLEANRUV task replica " >+ "(%s). You will need to manually run the CLEANRUV task on this replica (%s) error (%d)", >+ agmt_get_long_name(agmt), agmt_get_hostname(agmt), rc); >+ } >+ slapi_ch_free_string(&repl_dn); >+ slapi_sdn_free(&sdn); >+ conn_delete_internal_ext(conn); >+} >+ >+/* >+ * Check if the rid is in our list of "cleaned" rids >+ */ >+int >+is_cleaned_rid(ReplicaId rid) >+{ >+ int i; >+ >+ slapi_rwlock_rdlock(rid_lock); >+ for(i = 0; i < CLEANRIDSIZ && cleaned_rids[i] != 0; i++){ >+ if(rid == cleaned_rids[i]){ >+ slapi_rwlock_unlock(rid_lock); >+ return 1; >+ } >+ } >+ slapi_rwlock_unlock(rid_lock); >+ >+ return 0; >+} >+ >+int >+is_task_aborted(ReplicaId rid) >+{ >+ int i; >+ >+ if(rid == 0){ >+ return 0; >+ } >+ slapi_rwlock_rdlock(abort_rid_lock); >+ for(i = 0; i < CLEANRIDSIZ && aborted_rids[i] != 0; i++){ >+ if(rid == aborted_rids[i]){ >+ slapi_rwlock_unlock(abort_rid_lock); >+ return 1; >+ } >+ } >+ slapi_rwlock_unlock(abort_rid_lock); >+ return 0; >+} >+ >+/* >+ * Just add the rid to the in memory, as we don't want it to survive after a restart, >+ * This prevent the changelog from sending updates from this rid, and the local ruv >+ * will not be updated either. >+ */ >+void >+set_cleaned_rid(ReplicaId rid) >+{ >+ int i; >+ >+ slapi_rwlock_wrlock(rid_lock); >+ for(i = 0; i < CLEANRIDSIZ; i++){ >+ if(cleaned_rids[i] == 0){ >+ cleaned_rids[i] = rid; >+ cleaned_rids[i + 1] = 0; >+ } >+ } >+ slapi_rwlock_unlock(rid_lock); >+} >+ >+/* >+ * Add the rid and maxcsn to the repl config (so we can resume after a server restart) >+ */ >+void >+add_cleaned_rid(ReplicaId rid, Replica *r, char *maxcsn) >+{ >+ Slapi_PBlock *pb; >+ struct berval *vals[2]; >+ struct berval val; >+ LDAPMod *mods[2]; >+ LDAPMod mod; >+ char data[CSN_STRSIZE + 10]; >+ char *dn; >+ int rc; >+ >+ if(r == NULL || maxcsn == NULL){ >+ return; >+ } >+ /* >+ * Write the rid & maxcsn to the config entry >+ */ >+ val.bv_len = PR_snprintf(data, sizeof(data),"%d:%s", rid, maxcsn); >+ dn = replica_get_dn(r); >+ pb = slapi_pblock_new(); >+ mod.mod_op = LDAP_MOD_ADD|LDAP_MOD_BVALUES; >+ mod.mod_type = (char *)type_replicaCleanRUV; >+ mod.mod_bvalues = vals; >+ vals [0] = &val; >+ vals [1] = NULL; >+ val.bv_val = data; >+ mods[0] = &mod; >+ mods[1] = NULL; >+ >+ replica_add_cleanruv_data(r, val.bv_val); >+ >+ slapi_modify_internal_set_pb (pb, dn, mods, NULL, NULL, >+ repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); >+ slapi_modify_internal_pb (pb); >+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); >+ if (rc != LDAP_SUCCESS && rc != LDAP_TYPE_OR_VALUE_EXISTS){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "CleanAllRUV Task: failed to update replica " >+ "config (%d), rid (%d)\n", rc, rid); >+ } >+ slapi_ch_free_string(&dn); >+ slapi_pblock_destroy(pb); >+} >+ >+/* >+ * Add aborted rid and repl root to config in case of a server restart >+ */ >+void >+add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root) >+{ >+ Slapi_PBlock *pb; >+ struct berval *vals[2]; >+ struct berval val; >+ LDAPMod *mods[2]; >+ LDAPMod mod; >+ char *data; >+ char *dn; >+ int rc; >+ int i; >+ >+ slapi_rwlock_wrlock(abort_rid_lock); >+ for(i = 0; i < CLEANRIDSIZ; i++){ >+ if(aborted_rids[i] == 0){ >+ aborted_rids[i] = rid; >+ aborted_rids[i + 1] = 0; >+ break; >+ } >+ } >+ slapi_rwlock_unlock(abort_rid_lock); >+ /* >+ * Write the rid to the config entry >+ */ >+ dn = replica_get_dn(r); >+ pb = slapi_pblock_new(); >+ data = PR_smprintf("%d:%s", rid, repl_root); >+ mod.mod_op = LDAP_MOD_ADD|LDAP_MOD_BVALUES; >+ mod.mod_type = (char *)type_replicaAbortCleanRUV; >+ mod.mod_bvalues = vals; >+ vals [0] = &val; >+ vals [1] = NULL; >+ val.bv_val = data; >+ val.bv_len = strlen (data); >+ mods[0] = &mod; >+ mods[1] = NULL; >+ >+ slapi_modify_internal_set_pb (pb, dn, mods, NULL, NULL, >+ repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); >+ slapi_modify_internal_pb (pb); >+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); >+ if (rc != LDAP_SUCCESS){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: failed to update " >+ "replica config (%d), rid (%d)\n", rc, rid); >+ } >+ >+ slapi_ch_free_string(&dn); >+ slapi_ch_free_string(&data); >+ slapi_pblock_destroy(pb); >+} >+ >+void >+delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root){ >+ Slapi_PBlock *pb; >+ LDAPMod *mods[2]; >+ LDAPMod mod; >+ struct berval *vals[2]; >+ struct berval val; >+ char *data; >+ char *dn; >+ int rc; >+ int i; >+ >+ if(r == NULL) >+ return; >+ >+ /* >+ * Remove this rid, and optimize the array >+ */ >+ slapi_rwlock_wrlock(abort_rid_lock); >+ for(i = 0; i < CLEANRIDSIZ && aborted_rids[i] != rid; i++); /* found rid, stop */ >+ for(; i < CLEANRIDSIZ; i++){ >+ /* rewrite entire array */ >+ aborted_rids[i] = aborted_rids[i + 1]; >+ } >+ slapi_rwlock_unlock(abort_rid_lock); >+ /* >+ * Prepare the mods for the config entry >+ */ >+ dn = replica_get_dn(r); >+ pb = slapi_pblock_new(); >+ data = PR_smprintf("%d:%s", (int)rid, repl_root); >+ >+ mod.mod_op = LDAP_MOD_DELETE|LDAP_MOD_BVALUES; >+ mod.mod_type = (char *)type_replicaAbortCleanRUV; >+ mod.mod_bvalues = vals; >+ vals [0] = &val; >+ vals [1] = NULL; >+ val.bv_val = data; >+ val.bv_len = strlen (data); >+ mods[0] = &mod; >+ mods[1] = NULL; >+ >+ slapi_modify_internal_set_pb(pb, dn, mods, NULL, NULL, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); >+ slapi_modify_internal_pb (pb); >+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); >+ if (rc != LDAP_SUCCESS){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort CleanAllRUV Task: failed to remove replica " >+ "config (%d), rid (%d)\n", rc, rid); >+ } >+ slapi_pblock_destroy (pb); >+ slapi_ch_free_string(&dn); >+ slapi_ch_free_string(&data); >+} >+ >+/* >+ * Remove the rid from our list, and the config >+ */ >+void >+delete_cleaned_rid(Replica *r, ReplicaId rid, CSN *maxcsn) >+{ >+ Slapi_PBlock *pb; >+ Object *agmt_obj; >+ Repl_Agmt *agmt; >+ LDAPMod *mods[2]; >+ LDAPMod mod; >+ struct berval *vals[2]; >+ struct berval val; >+ char *dn; >+ char data[CSN_STRSIZE + 10]; >+ char csnstr[CSN_STRSIZE]; >+ int rc; >+ int i; >+ >+ if(r == NULL || maxcsn == NULL) >+ return; >+ >+ /* >+ * Remove this rid, and optimize the array >+ */ >+ slapi_rwlock_wrlock(rid_lock); >+ for(i = 0; i < CLEANRIDSIZ && cleaned_rids[i] != rid; i++); /* found rid, stop */ >+ for(; i < CLEANRIDSIZ; i++){ >+ /* rewrite entire array */ >+ cleaned_rids[i] = cleaned_rids[i + 1]; >+ } >+ slapi_rwlock_unlock(rid_lock); >+ /* >+ * Prepare the mods for the config entry >+ */ >+ csn_as_string(maxcsn, PR_FALSE, csnstr); >+ val.bv_len = PR_snprintf(data, sizeof(data), "%d:%s", (int)rid, csnstr); >+ dn = replica_get_dn(r); >+ pb = slapi_pblock_new(); >+ mod.mod_op = LDAP_MOD_DELETE|LDAP_MOD_BVALUES; >+ mod.mod_type = (char *)type_replicaCleanRUV; >+ mod.mod_bvalues = vals; >+ vals [0] = &val; >+ vals [1] = NULL; >+ val.bv_val = data; >+ mods[0] = &mod; >+ mods[1] = NULL; >+ >+ replica_remove_cleanruv_data(r, data); >+ >+ slapi_modify_internal_set_pb(pb, dn, mods, NULL, NULL, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); >+ slapi_modify_internal_pb (pb); >+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); >+ if (rc != LDAP_SUCCESS){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "CleanAllRUV Task: failed to remove replica config " >+ "(%d), rid (%d)\n", rc, rid); >+ } >+ slapi_pblock_destroy (pb); >+ slapi_ch_free_string(&dn); >+ /* >+ * Now release the cleaned rid from the repl agmts >+ */ >+ agmt_obj = agmtlist_get_first_agreement_for_replica (r); >+ while (agmt_obj){ >+ agmt = (Repl_Agmt*)object_get_data (agmt_obj); >+ if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ >+ agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj); >+ continue; >+ } >+ agmt_set_cleanruv_data(agmt, rid, CLEANRUV_RELEASED); >+ agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj); >+ } >+} >+ >+/* >+ * Abort the CLEANALLRUV task >+ */ >+int >+replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, >+ int *returncode, char *returntext, void *arg) >+{ >+ PRThread *thread = NULL; >+ struct berval *payload = NULL; >+ Slapi_Task *task = NULL; >+ Replica *replica; >+ ReplicaId rid; >+ cleanruv_data *data = NULL; >+ Slapi_DN *sdn = NULL; >+ Object *r; >+ CSN *maxcsn = NULL; >+ const char *base_dn; >+ const char *rid_str; >+ const char *certify_all; >+ char *ridstr = NULL; >+ int rc = SLAPI_DSE_CALLBACK_OK; >+ >+ if(get_abort_cleanruv_task_count() >= CLEANRIDSIZ){ >+ /* we are already running the maximum number of tasks */ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Exceeded maximum number of active ABORT CLEANALLRUV tasks(%d)",CLEANRIDSIZ); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ return SLAPI_DSE_CALLBACK_ERROR; >+ } >+ >+ /* allocate new task now */ >+ task = slapi_new_task(slapi_entry_get_ndn(e)); >+ /* >+ * Get our task settings >+ */ >+ if ((rid_str = fetch_attr(e, "replica-id", 0)) == NULL){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Missing required attr \"replica-id\""); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OBJECT_CLASS_VIOLATION; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ if ((base_dn = fetch_attr(e, "replica-base-dn", 0)) == NULL){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Missing required attr \"replica-base-dn\""); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OBJECT_CLASS_VIOLATION; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ certify_all = fetch_attr(e, "replica-certify-all", 0); >+ /* >+ * Check the rid >+ */ >+ rid = atoi(rid_str); >+ if (rid <= 0 || rid >= READ_ONLY_REPLICA_ID){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for task - (%s)", >+ rid, slapi_sdn_get_dn(slapi_entry_get_sdn(e))); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID,"%s", returntext); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ /* >+ * Get the replica object >+ */ >+ sdn = slapi_sdn_new_dn_byval(base_dn); >+ if((r = replica_get_replica_from_dn(sdn)) == NULL){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Failed to find replica from dn(%s)", base_dn); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ /* >+ * Check verify value >+ */ >+ if(certify_all){ >+ if(strcasecmp(certify_all,"yes") && strcasecmp(certify_all,"no")){ >+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid value for \"replica-certify-all\", the value " >+ "must be \"yes\" or \"no\"."); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "%s", returntext); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ } else { >+ certify_all = "no"; >+ } >+ /* >+ * Create payload >+ */ >+ ridstr = slapi_ch_smprintf("%d:%s:%s", rid, base_dn, certify_all); >+ payload = create_ruv_payload(ridstr); >+ >+ if(payload == NULL){ >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "Failed to create extended op payload, aborting task"); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ /* >+ * Stop the cleaning, and delete the rid >+ */ >+ replica = (Replica*)object_get_data (r); >+ maxcsn = replica_get_cleanruv_maxcsn(replica, rid); >+ delete_cleaned_rid(replica, rid, maxcsn); >+ add_aborted_rid(rid, replica, (char *)base_dn); >+ stop_ruv_cleaning(); >+ /* >+ * Prepare the abort struct and fire off the thread >+ */ >+ data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); >+ if (data == NULL) { >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID,"Failed to allocate abort_cleanruv_data. Aborting task."); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ goto out; >+ } >+ data->repl_obj = r; /* released in replica_abort_task_thread() */ >+ data->replica = replica; >+ data->task = task; >+ data->payload = payload; >+ data->rid = rid; >+ data->repl_root = slapi_ch_strdup(base_dn); >+ data->sdn = NULL; >+ data->certify = slapi_ch_strdup(certify_all); >+ >+ thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread, >+ (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); >+ if (thread == NULL) { >+ object_release(r); >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID,"Unable to create abort thread. Aborting task."); >+ *returncode = LDAP_OPERATIONS_ERROR; >+ rc = SLAPI_DSE_CALLBACK_ERROR; >+ } >+ >+out: >+ csn_free(&maxcsn); >+ slapi_ch_free_string(&ridstr); >+ slapi_sdn_free(&sdn); >+ >+ if(rc != SLAPI_DSE_CALLBACK_OK){ >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "Abort Task failed (%d)", rc); >+ slapi_task_finish(task, rc); >+ } >+ >+ return rc; >+} >+ >+/* >+ * Abort CLEANALLRUV task thread >+ */ >+void >+replica_abort_task_thread(void *arg) >+{ >+ cleanruv_data *data = (cleanruv_data *)arg; >+ Repl_Agmt *agmt; >+ Object *agmt_obj; >+ int agmt_not_notified = 1; >+ int interval = 10; >+ int release_it = 0; >+ >+ /* >+ * Need to build the replica from the dn >+ */ >+ if(data->replica == NULL && data->repl_obj == NULL){ >+ /* >+ * This thread was initiated at startup because the process did not finish. Due >+ * to timing issues, we need to wait to grab the replica obj until we get here. >+ */ >+ if((data->repl_obj = replica_get_replica_from_dn(data->sdn)) == NULL){ >+ cleanruv_log(data->task, ABORT_CLEANALLRUV_ID, "Failed to get replica object from dn (%s).", slapi_sdn_get_dn(data->sdn)); >+ goto done; >+ } >+ if(data->replica == NULL && data->repl_obj){ >+ data->replica = (Replica*)object_get_data(data->repl_obj); >+ } >+ release_it = 1; >+ } >+ >+ /* >+ * Now send the cleanruv extended op to all the agreements >+ */ >+ while(agmt_not_notified && !slapi_is_shutting_down()){ >+ agmt_obj = agmtlist_get_first_agreement_for_replica (data->replica); >+ if(agmt_obj == NULL){ >+ agmt_not_notified = 0; >+ break; >+ } >+ while (agmt_obj){ >+ agmt = (Repl_Agmt*)object_get_data (agmt_obj); >+ if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ >+ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); >+ agmt_not_notified = 0; >+ continue; >+ } >+ if(replica_cleanallruv_send_abort_extop(agmt, data->task, data->payload)){ >+ if(strcasecmp(data->certify,"yes") == 0){ >+ /* we are verifying all the replicas receive the abort task */ >+ agmt_not_notified = 1; >+ break; >+ } else { >+ /* we do not care if we could not reach a replica, just continue as if we did */ >+ agmt_not_notified = 0; >+ } >+ } else { >+ /* success */ >+ agmt_not_notified = 0; >+ } >+ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); >+ } /* while loop for agmts */ >+ >+ if(agmt_not_notified == 0){ >+ /* everybody has been contacted */ >+ break; >+ } >+ /* >+ * need to sleep between passes >+ */ >+ cleanruv_log(data->task, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval); >+ PR_Lock( notify_lock ); >+ PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); >+ PR_Unlock( notify_lock ); >+ >+ if(interval < 14400){ /* 4 hour max */ >+ interval = interval * 2; >+ } else { >+ interval = 14400; >+ } >+ } /* while */ >+ >+done: >+ if(agmt_not_notified){ >+ /* failure */ >+ cleanruv_log(data->task, ABORT_CLEANALLRUV_ID,"Abort task failed, will resume the task at the next server startup."); >+ } else { >+ /* >+ * Clean up the config >+ */ >+ delete_aborted_rid(data->replica, data->rid, data->repl_root); >+ cleanruv_log(data->task, ABORT_CLEANALLRUV_ID, "Successfully aborted cleanAllRUV task for rid(%d)", data->rid); >+ } >+ if(data->task){ >+ slapi_task_finish(data->task, agmt_not_notified); >+ } >+ if(data->repl_obj && release_it) >+ object_release(data->repl_obj); >+ if(data->payload){ >+ ber_bvfree(data->payload); >+ } >+ slapi_ch_free_string(&data->repl_root); >+ slapi_ch_free_string(&data->certify); >+ slapi_sdn_free(&data->sdn); >+ slapi_ch_free((void **)&data); >+} >+ >+static int >+replica_cleanallruv_send_abort_extop(Repl_Agmt *ra, Slapi_Task *task, struct berval *payload) >+{ >+ Repl_Connection *conn = NULL; >+ ConnResult crc = 0; >+ int msgid = 0; >+ int rc = 0; >+ >+ if((conn = conn_new(ra)) == NULL){ >+ return -1; >+ } >+ if(conn_connect(conn) == CONN_OPERATION_SUCCESS){ >+ crc = conn_send_extended_operation(conn, REPL_ABORT_CLEANRUV_OID, payload, NULL, &msgid); >+ /* >+ * success or failure, just return the error code >+ */ >+ rc = crc; >+ if(rc){ >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "Failed to send extop to replica(%s).", agmt_get_long_name(ra)); >+ } >+ } else { >+ cleanruv_log(task, ABORT_CLEANALLRUV_ID, "Failed to connect to replica(%s).", agmt_get_long_name(ra)); >+ rc = -1; >+ } >+ conn_delete_internal_ext(conn); >+ >+ return rc; >+} >+ >+ >+static int >+replica_cleanallruv_send_extop(Repl_Agmt *ra, ReplicaId rid, Slapi_Task *task, struct berval *payload, int check_result) >+{ >+ Repl_Connection *conn = NULL; >+ ConnResult crc = 0; >+ int msgid = 0; >+ int rc = 0; >+ >+ if((conn = conn_new(ra)) == NULL){ >+ return -1; >+ } >+ if(conn_connect(conn) == CONN_OPERATION_SUCCESS){ >+ crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, payload, NULL, &msgid); >+ if(crc == CONN_OPERATION_SUCCESS && check_result){ >+ struct berval *retsdata = NULL; >+ char *retoid = NULL; >+ >+ crc = conn_read_result_ex(conn, &retoid, &retsdata, NULL, msgid, NULL, 1); >+ if (CONN_OPERATION_SUCCESS == crc ){ >+ struct berval **ruv_bervals = NULL; >+ struct berval *data = NULL; >+ char *data_guid = NULL; >+ >+ decode_repl_ext_response(retsdata, &rc, &ruv_bervals, &data_guid, &data); >+ /* just free everything, we only wanted "rc" */ >+ slapi_ch_free_string(&data_guid); >+ if(data) >+ ber_bvfree(data); >+ if (ruv_bervals) >+ ber_bvecfree(ruv_bervals); >+ >+ if(rc == 0 ){ /* rc == 1 is success */ >+ cleanruv_log(task, CLEANALLRUV_ID,"Replica %s does not support the CLEANALLRUV task. Sending replica CLEANRUV task...", >+ slapi_sdn_get_dn(agmt_get_dn_byref(ra))); >+ /* >+ * Ok, this replica doesn't know about CLEANALLRUV, so just manually >+ * add the CLEANRUV task to the replica. >+ */ >+ replica_send_cleanruv_task(ra, rid, task); >+ } else { >+ /* extop was accepted */ >+ rc = 0; >+ } >+ if (NULL != retoid) >+ ldap_memfree(retoid); >+ if (NULL != retsdata) >+ ber_bvfree(retsdata); >+ } >+ agmt_set_cleanruv_data(ra, rid, CLEANRUV_NOTIFIED); >+ } else { >+ /* >+ * success or failure, just return the error code >+ */ >+ rc = crc; >+ } >+ } else { >+ rc =-1; >+ } >+ conn_delete_internal_ext(conn); >+ >+ return rc; >+} >+ >+static int >+replica_cleanallruv_check_maxcsn(Repl_Agmt *agmt, char *rid_text, char *maxcsn, Slapi_Task *task) >+{ >+ Repl_Connection *conn = NULL; >+ LDAP *ld; >+ Slapi_DN *sdn; >+ struct berval **vals; >+ LDAPMessage *result = NULL, *entry = NULL; >+ BerElement *ber; >+ char *attrs[2]; >+ char *attr = NULL; >+ char *iter = NULL; >+ char *ruv_part = NULL; >+ int found_rid = 0; >+ int part_count = 0; >+ int rc = 0, i; >+ >+ if((conn = conn_new(agmt)) == NULL){ >+ return -1; >+ } >+ >+ if(conn_connect(conn) == CONN_OPERATION_SUCCESS){ >+ attrs[0] = "nsds50ruv"; >+ attrs[1] = NULL; >+ ld = conn_get_ldap(conn); >+ if(ld == NULL){ >+ conn_delete_internal_ext(conn); >+ return -1; >+ } >+ sdn = agmt_get_replarea(agmt); >+ rc = ldap_search_ext_s(ld, slapi_sdn_get_dn(sdn), LDAP_SCOPE_SUBTREE, >+ "(&(nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff)(objectclass=nstombstone))", >+ attrs, 0, NULL, NULL, NULL, 0, &result); >+ slapi_sdn_free(&sdn); >+ if(rc != LDAP_SUCCESS){ >+ cleanruv_log(task, CLEANALLRUV_ID,"Failed to contact " >+ "agmt (%s) error (%d), will retry later.", agmt_get_long_name(agmt), rc); >+ conn_delete_internal_ext(conn); >+ return -1; >+ } >+ entry = ldap_first_entry( ld, result ); >+ if ( entry != NULL ) { >+ for ( attr = ldap_first_attribute( ld, entry, &ber ); attr != NULL; attr = ldap_next_attribute( ld, entry, ber ) ){ >+ /* make sure the attribute is nsds50ruv */ >+ if(strcasecmp(attr,"nsds50ruv") != 0){ >+ ldap_memfree( attr ); >+ continue; >+ } >+ found_rid = 0; >+ if ((vals = ldap_get_values_len( ld, entry, attr)) != NULL ) { >+ for ( i = 0; vals[i] && vals[i]->bv_val; i++ ) { >+ /* look for this replica */ >+ if(strstr(vals[i]->bv_val, rid_text)){ >+ found_rid = 1; >+ /* get the max csn compare it to our known max csn */ >+ ruv_part = ldap_utf8strtok_r(vals[i]->bv_val, " ", &iter); >+ for(part_count = 1; ruv_part && part_count < 5; part_count++){ >+ ruv_part = ldap_utf8strtok_r(iter, " ", &iter); >+ } >+ if(part_count == 5 && ruv_part){ >+ /* we have the maxcsn */ >+ if(strcmp(ruv_part, maxcsn)){ >+ /* we are not caught up yet, free, and return */ >+ ldap_value_free_len(vals); >+ ldap_memfree( attr ); >+ ldap_msgfree( result ); >+ if(ber){ >+ ber_free( ber, 0 ); >+ } >+ conn_delete_internal_ext(conn); >+ return -1; >+ } else { >+ /* ok this replica has all the updates from the deleted replica */ >+ rc = 0; >+ } >+ } else { >+ /* there is no maxcsn for this rid - treat it as caught up */ >+ rc = 0; >+ } >+ } >+ } >+ if(!found_rid){ >+ /* must have been cleaned already */ >+ rc = 0; >+ } >+ ldap_value_free_len(vals); >+ } >+ ldap_memfree( attr ); >+ } >+ if ( ber != NULL ) { >+ ber_free( ber, 0 ); >+ } >+ } >+ if(result) >+ ldap_msgfree( result ); >+ } else { >+ rc = -1; >+ } >+ conn_delete_internal_ext(conn); >+ >+ return rc; >+} >+ >+static int >+replica_cleanallruv_replica_alive(Repl_Agmt *agmt) >+{ >+ Repl_Connection *conn = NULL; >+ LDAP *ld = NULL; >+ LDAPMessage *result = NULL; >+ int rc = 0; >+ >+ if((conn = conn_new(agmt)) == NULL){ >+ return -1; >+ } >+ if(conn_connect(conn) == CONN_OPERATION_SUCCESS){ >+ ld = conn_get_ldap(conn); >+ if(ld == NULL){ >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: failed to get LDAP " >+ "handle from the replication agmt (%s). Moving on to the next agmt.\n",agmt_get_long_name(agmt)); >+ conn_delete_internal_ext(conn); >+ return -1; >+ } >+ if(ldap_search_ext_s(ld, "", LDAP_SCOPE_BASE, "objectclass=top", >+ NULL, 0, NULL, NULL, NULL, 0, &result) == LDAP_SUCCESS) >+ { >+ rc = 0; >+ } else { >+ rc = -1; >+ } >+ if(result) >+ ldap_msgfree( result ); >+ } else { >+ rc = -1; >+ } >+ conn_delete_internal_ext(conn); >+ >+ return rc; >+} >+ >+static int >+replica_cleanallruv_check_ruv(Repl_Agmt *ra, char *rid_text, Slapi_Task *task) >+{ >+ Repl_Connection *conn = NULL; >+ BerElement *ber = NULL; >+ struct berval **vals = NULL; >+ LDAPMessage *result = NULL, *entry = NULL; >+ LDAP *ld = NULL; >+ Slapi_DN *sdn; >+ char *attrs[2]; >+ char *attr = NULL; >+ int rc = 0, i; >+ >+ if((conn = conn_new(ra)) == NULL){ >+ return -1; >+ } >+ if(conn_connect(conn) == CONN_OPERATION_SUCCESS){ >+ attrs[0] = "nsds50ruv"; >+ attrs[1] = NULL; >+ ld = conn_get_ldap(conn); >+ if(ld == NULL){ >+ cleanruv_log(task, CLEANALLRUV_ID,"Failed to get LDAP handle from " >+ "the replication agmt (%s). Moving on to the next agmt.",agmt_get_long_name(ra)); >+ rc = -1; >+ goto done; >+ } >+ >+ sdn = agmt_get_replarea(ra); >+ rc = ldap_search_ext_s(ld, slapi_sdn_get_dn(sdn), LDAP_SCOPE_SUBTREE, >+ "(&(nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff)(objectclass=nstombstone))", >+ attrs, 0, NULL, NULL, NULL, 0, &result); >+ slapi_sdn_free(&sdn); >+ if(rc != LDAP_SUCCESS){ >+ cleanruv_log(task, CLEANALLRUV_ID,"Failed to contact " >+ "agmt (%s) error (%d), will retry later.", agmt_get_long_name(ra), rc); >+ rc = -1; >+ goto done; >+ } >+ entry = ldap_first_entry( ld, result ); >+ if ( entry != NULL ) { >+ for ( attr = ldap_first_attribute( ld, entry, &ber ); attr != NULL; attr = ldap_next_attribute( ld, entry, ber ) ){ >+ /* make sure the attribute is nsds50ruv */ >+ if(strcasecmp(attr,"nsds50ruv") != 0){ >+ ldap_memfree( attr ); >+ continue; >+ } >+ if ((vals = ldap_get_values_len( ld, entry, attr)) != NULL ) { >+ for ( i = 0; vals[i] && vals[i]->bv_val; i++ ) { >+ /* look for this replica */ >+ if(strstr(vals[i]->bv_val, rid_text)){ >+ /* rid has not been cleaned yet, free and return */ >+ rc = -1; >+ ldap_value_free_len(vals); >+ ldap_memfree( attr ); >+ if ( ber != NULL ) { >+ ber_free( ber, 0 ); >+ ber = NULL; >+ } >+ goto done; >+ } else { >+ rc = 0; >+ } >+ } >+ ldap_value_free_len(vals); >+ } >+ ldap_memfree( attr ); >+ } /* for loop */ >+ if ( ber != NULL ) { >+ ber_free( ber, 0 ); >+ } >+ } >+done: >+ if(result) >+ ldap_msgfree( result ); >+ } else { >+ return -1; >+ } >+ conn_delete_internal_ext(conn); >+ >+ return rc; >+} >+ >+static int >+get_cleanruv_task_count() >+{ >+ int i, count = 0; >+ >+ slapi_rwlock_wrlock(rid_lock); >+ for(i = 0; i < CLEANRIDSIZ; i++){ >+ if(cleaned_rids[i] != 0){ >+ count++; >+ } >+ } >+ slapi_rwlock_unlock(rid_lock); >+ >+ return count; >+} >+ >+static int >+get_abort_cleanruv_task_count() >+{ >+ int i, count = 0; >+ >+ slapi_rwlock_wrlock(rid_lock); >+ for(i = 0; i < CLEANRIDSIZ; i++){ >+ if(aborted_rids[i] != 0){ >+ count++; >+ } >+ } >+ slapi_rwlock_unlock(rid_lock); >+ >+ return count; >+} >+ >+/* >+ * Notify sleeping CLEANALLRUV threads to stop >+ */ >+void >+stop_ruv_cleaning() >+{ >+ if(notify_lock){ >+ PR_Lock( notify_lock ); >+ PR_NotifyCondVar( notify_cvar ); >+ PR_Unlock( notify_lock ); >+ } >+} >+ >+/* >+ * Write our logging to the task and error log >+ */ >+void >+cleanruv_log(Slapi_Task *task, char *task_type, char *fmt, ...) >+{ >+ va_list ap1; >+ va_list ap2; >+ va_list ap3; >+ va_list ap4; >+ char *errlog_fmt; >+ >+ va_start(ap1, fmt); >+ va_start(ap2, fmt); >+ va_start(ap3, fmt); >+ va_start(ap4, fmt); >+ >+ if(task){ >+ slapi_task_log_notice_ext(task, fmt, ap1); >+ slapi_task_log_status_ext(task, fmt, ap2); >+ slapi_task_inc_progress(task); >+ } >+ errlog_fmt = PR_smprintf("%s: %s\n",task_type, fmt); >+ slapi_log_error_ext(SLAPI_LOG_FATAL, repl_plugin_name, errlog_fmt, ap3, ap4); >+ slapi_ch_free_string(&errlog_fmt); >+ >+ va_end(ap1); >+ va_end(ap2); >+ va_end(ap3); >+ va_end(ap4); >+} > >diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c >index e0b10ae..dd32ef2 100644 >--- a/ldap/servers/plugins/replication/repl5_ruv.c >+++ b/ldap/servers/plugins/replication/repl5_ruv.c >@@ -487,9 +487,9 @@ ruv_replace_replica_purl (RUV *ruv, ReplicaId rid, const char *replica_purl) > replica = ruvGetReplica (ruv, rid); > if (replica != NULL) > { >- if (strcmp(replica->replica_purl, replica_purl)) { /* purl updated */ >+ if (replica->replica_purl == NULL || strcmp(replica->replica_purl, replica_purl)) { /* purl updated */ > /* Replace replica_purl in RUV since supplier has been updated. */ >- slapi_ch_free((void **)&(replica->replica_purl)); >+ slapi_ch_free_string(&replica->replica_purl); > replica->replica_purl = slapi_ch_strdup(replica_purl); > /* Also, reset csn and min_csn. */ > replica->csn = replica->min_csn = NULL; >@@ -865,8 +865,21 @@ ruv_covers_csn_internal(const RUV *ruv, const CSN *csn, PRBool strict) > replica = ruvGetReplica (ruv, rid); > if (replica == NULL) > { >- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn: replica for id %d not found\n", rid); >- return_value = PR_FALSE; >+ /* >+ * We don't know anything about this replica change in the cl, mark it to be zapped. >+ * This could of been a previously cleaned ruv, but the server was restarted before >+ * the change could be trimmed. >+ * >+ * Only the change log trimming calls this function with "strict" set. So we'll return success >+ * if strict is set. >+ */ >+ if(strict){ >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn: replica for id %d not found.\n", rid); >+ return_value = PR_TRUE; >+ } else { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "ruv_covers_csn: replica for id %d not found.\n", rid); >+ return_value = PR_FALSE; >+ } > } > else > { >@@ -907,13 +920,42 @@ ruv_covers_csn_strict(const RUV *ruv, const CSN *csn) > return rc; > } > >+/* >+ * Used by the cleanallruv task >+ * >+ * We want to return TRUE if replica is NULL, >+ * and we want to use "csn_compare() <=" >+ */ >+PRBool >+ruv_covers_csn_cleanallruv(const RUV *ruv, const CSN *csn) >+{ >+ RUVElement *replica; >+ ReplicaId rid; >+ PRBool return_value; >+ >+ if (ruv == NULL || csn == NULL){ >+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn_cleanallruv: NULL argument\n"); >+ return_value = PR_FALSE; >+ } else { >+ rid = csn_get_replicaid(csn); >+ replica = ruvGetReplica (ruv, rid); >+ if (replica == NULL){ >+ /* already cleaned */ >+ return_value = PR_TRUE; >+ } else { >+ return_value = (csn_compare (csn, replica->csn) <= 0); >+ } >+ } >+ >+ return return_value; >+} > > /* > * The function gets min{maxcsns of all ruv elements} if get_the_max=0, > * or max{maxcsns of all ruv elements} if get_the_max != 0. > */ > static int >-ruv_get_min_or_max_csn(const RUV *ruv, CSN **csn, int get_the_max) >+ruv_get_min_or_max_csn(const RUV *ruv, CSN **csn, int get_the_max, ReplicaId rid) > { > int return_value; > >@@ -943,12 +985,18 @@ ruv_get_min_or_max_csn(const RUV *ruv, CSN **csn, int get_the_max) > { > continue; > } >- >- if (found == NULL || >- (!get_the_max && csn_compare(found, replica->csn)>0) || >- ( get_the_max && csn_compare(found, replica->csn)<0)) >- { >- found = replica->csn; >+ if(rid){ /* we are only interested in this rid's maxcsn */ >+ if(replica->rid == rid){ >+ found = replica->csn; >+ break; >+ } >+ } else { >+ if (found == NULL || >+ (!get_the_max && csn_compare(found, replica->csn)>0) || >+ ( get_the_max && csn_compare(found, replica->csn)<0)) >+ { >+ found = replica->csn; >+ } > } > } > if (found == NULL) >@@ -966,15 +1014,20 @@ ruv_get_min_or_max_csn(const RUV *ruv, CSN **csn, int get_the_max) > } > > int >+ruv_get_rid_max_csn(const RUV *ruv, CSN **csn, ReplicaId rid){ >+ return ruv_get_min_or_max_csn(ruv, csn, 1 /* get the max */, rid); >+} >+ >+int > ruv_get_max_csn(const RUV *ruv, CSN **csn) > { >- return ruv_get_min_or_max_csn(ruv, csn, 1 /* get the max */); >+ return ruv_get_min_or_max_csn(ruv, csn, 1 /* get the max */, 0 /* rid */); > } > > int > ruv_get_min_csn(const RUV *ruv, CSN **csn) > { >- return ruv_get_min_or_max_csn(ruv, csn, 0 /* get the min */); >+ return ruv_get_min_or_max_csn(ruv, csn, 0 /* get the min */, 0 /* rid */); > } > > int >@@ -1080,6 +1133,22 @@ ruv_to_bervals(const RUV *ruv, struct berval ***bvals) > return return_value; > } > >+void >+ruv_get_cleaned_rids(RUV *ruv, ReplicaId *rids) >+{ >+ RUVElement *replica; >+ int cookie; >+ int i = 0; >+ >+ for (replica = dl_get_first (ruv->elements, &cookie); replica; >+ replica = dl_get_next (ruv->elements, &cookie)) >+ { >+ if(is_cleaned_rid(replica->rid)){ >+ rids[i++] = replica->rid; >+ } >+ } >+} >+ > int > ruv_to_smod(const RUV *ruv, Slapi_Mod *smod) > { >@@ -1403,20 +1472,27 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn) > RUVElement* replica; > char csn_str[CSN_STRSIZE]; > int rc = RUV_SUCCESS; >+ int rid = csn_get_replicaid (csn); > > PR_ASSERT (ruv && csn); > > /* locate ruvElement */ > slapi_rwlock_wrlock (ruv->lock); >- replica = ruvGetReplica (ruv, csn_get_replicaid (csn)); >+ >+ if(is_cleaned_rid(rid)){ >+ /* return success because we want to consume the update, but not perform it */ >+ rc = RUV_COVERS_CSN; >+ goto done; >+ } >+ replica = ruvGetReplica (ruv, rid); > if (replica == NULL) > { >- replica = ruvAddReplicaNoCSN (ruv, csn_get_replicaid (csn), NULL/*purl*/); >+ replica = ruvAddReplicaNoCSN (ruv, rid, NULL/*purl*/); > if (replica == NULL) > { > if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) { > slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_add_csn_inprogress: failed to add replica" >- " that created csn %s\n", csn_as_string (csn, PR_FALSE, csn_str)); >+ " that created csn %s\n", csn_as_string (csn, PR_FALSE, csn_str)); > } > rc = RUV_MEMORY_ERROR; > goto done; >diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h >index d329dc3..944f5ed 100644 >--- a/ldap/servers/plugins/replication/repl5_ruv.h >+++ b/ldap/servers/plugins/replication/repl5_ruv.h >@@ -115,8 +115,10 @@ void ruv_set_replica_generation (RUV *ruv, const char *generation); > PRBool ruv_covers_ruv(const RUV *covering_ruv, const RUV *covered_ruv); > PRBool ruv_covers_csn(const RUV *ruv, const CSN *csn); > PRBool ruv_covers_csn_strict(const RUV *ruv, const CSN *csn); >+PRBool ruv_covers_csn_cleanallruv(const RUV *ruv, const CSN *csn); > int ruv_get_min_csn(const RUV *ruv, CSN **csn); > int ruv_get_max_csn(const RUV *ruv, CSN **csn); >+int ruv_get_rid_max_csn(const RUV *ruv, CSN **csn, ReplicaId rid); > int ruv_enumerate_elements (const RUV *ruv, FNEnumRUV fn, void *arg); > int ruv_to_smod(const RUV *ruv, Slapi_Mod *smod); > int ruv_last_modified_to_smod(const RUV *ruv, Slapi_Mod *smod); >diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c >index 2ff3627..a33dff3 100644 >--- a/ldap/servers/plugins/replication/repl_extop.c >+++ b/ldap/servers/plugins/replication/repl_extop.c >@@ -71,6 +71,7 @@ > * > */ > static int check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv); >+static multimaster_mtnode_extension *replica_config_get_mtnode_by_dn(const char *dn); > > static int > encode_ruv (BerElement *ber, const RUV *ruv) >@@ -1260,7 +1261,6 @@ multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb) > { > /* The ruv from the supplier may have changed. Report the change on the > consumer side */ >- > replica_update_ruv_consumer(r, connext->supplier_ruv); > } > >@@ -1315,6 +1315,381 @@ free_and_return: > } > > /* >+ * Return the mtnode extension of the dn >+ */ >+static multimaster_mtnode_extension * >+replica_config_get_mtnode_by_dn(const char *dn) >+{ >+ Slapi_DN *sdn; >+ mapping_tree_node *mtnode; >+ multimaster_mtnode_extension *ext = NULL; >+ >+ sdn = slapi_sdn_new_dn_byval(dn); >+ mtnode = slapi_get_mapping_tree_node_by_dn (sdn); >+ if (mtnode) { >+ /* check if the replica object already exists in the subtree */ >+ ext = (multimaster_mtnode_extension *)repl_con_get_ext (REPL_CON_EXT_MTNODE, mtnode); >+ } >+ slapi_sdn_free (&sdn); >+ >+ return ext; >+} >+ >+/* >+ * Decode the ber element passed to us by the cleanAllRUV task >+ */ >+int >+decode_cleanruv_payload(struct berval *extop_value, char **payload) >+{ >+ BerElement *tmp_bere = NULL; >+ int rc = 0; >+ >+ if ((tmp_bere = ber_init(extop_value)) == NULL){ >+ rc = -1; >+ goto free_and_return; >+ } >+ if (ber_scanf(tmp_bere, "{") == LBER_ERROR){ >+ rc = -1; >+ goto free_and_return; >+ } >+ if (ber_get_stringa(tmp_bere, payload) == LBER_DEFAULT){ >+ rc = -1; >+ goto free_and_return; >+ } >+ >+ if (ber_scanf(tmp_bere, "}") == LBER_ERROR){ >+ rc = -1; >+ goto free_and_return; >+ } >+ >+free_and_return: >+ if (-1 == rc){ >+ slapi_ch_free_string(payload); >+ } >+ if (NULL != tmp_bere){ >+ ber_free(tmp_bere, 1); >+ tmp_bere = NULL; >+ } >+ return rc; >+} >+ >+int >+multimaster_extop_abort_cleanruv(Slapi_PBlock *pb) >+{ >+ multimaster_mtnode_extension *mtnode_ext; >+ PRThread *thread = NULL; >+ cleanruv_data *data; >+ Replica *r; >+ ReplicaId rid; >+ CSN *maxcsn; >+ struct berval *extop_payload; >+ char *extop_oid; >+ char *repl_root; >+ char *payload = NULL; >+ char *certify_all; >+ char *iter; >+ int rc = 0; >+ >+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid); >+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_payload); >+ >+ if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_OID) != 0 || >+ NULL == extop_payload || NULL == extop_payload->bv_val){ >+ /* something is wrong, error out */ >+ return LDAP_OPERATIONS_ERROR; >+ } >+ /* >+ * Decode the payload, and grab our settings >+ */ >+ if(decode_cleanruv_payload(extop_payload, &payload)){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort cleanAllRUV task: failed to decode payload. Aborting ext op\n"); >+ return LDAP_OPERATIONS_ERROR; >+ } >+ rid = atoi(ldap_utf8strtok_r(payload, ":", &iter)); >+ repl_root = ldap_utf8strtok_r(iter, ":", &iter); >+ certify_all = ldap_utf8strtok_r(iter, ":", &iter); >+ >+ if(!is_cleaned_rid(rid) || is_task_aborted(rid)){ >+ /* This replica has already been aborted, or was never cleaned, or already finished cleaning */ >+ goto out; >+ } else { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort cleanAllRUV task: aborting cleanallruv task for rid(%d)\n", rid); >+ } >+ /* >+ * Get the node, so we can get the replica and its agreements >+ */ >+ if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort cleanAllRUV task: failed to get replication node " >+ "from (%s), aborting operation\n", repl_root); >+ rc = LDAP_OPERATIONS_ERROR; >+ goto out; >+ } >+ if (mtnode_ext->replica){ >+ object_acquire (mtnode_ext->replica); >+ } else { >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort cleanAllRUV task: replica is missing from (%s), " >+ "aborting operation\n",repl_root); >+ rc = LDAP_OPERATIONS_ERROR; >+ goto out; >+ } >+ r = (Replica*)object_get_data (mtnode_ext->replica); >+ if(r == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Abort cleanAllRUV task: replica is NULL, aborting task\n"); >+ rc = LDAP_OPERATIONS_ERROR; >+ goto out; >+ } >+ /* >+ * Prepare the abort data >+ */ >+ data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); >+ if (data == NULL) { >+ slapi_log_error( SLAPI_LOG_REPL, repl_plugin_name, "Abort cleanAllRUV task: failed to allocate " >+ "abort_cleanruv_data. Aborting task.\n"); >+ rc = LDAP_OPERATIONS_ERROR; >+ goto out; >+ } >+ data->repl_obj = mtnode_ext->replica; /* released in replica_abort_task_thread() */ >+ data->replica = r; >+ data->task = NULL; >+ data->payload = slapi_ch_bvdup(extop_payload); >+ data->rid = rid; >+ data->repl_root = slapi_ch_strdup(repl_root); >+ data->certify = slapi_ch_strdup(certify_all); >+ /* >+ * Stop the cleaning, and delete the rid >+ */ >+ maxcsn = replica_get_cleanruv_maxcsn(r, rid); >+ delete_cleaned_rid(r, rid, maxcsn); >+ csn_free(&maxcsn); >+ add_aborted_rid(rid, r, repl_root); >+ stop_ruv_cleaning(); >+ /* >+ * Send out the extended ops to the replicas >+ */ >+ thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread, >+ (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); >+ if (thread == NULL) { >+ if(mtnode_ext->replica){ >+ object_release(mtnode_ext->replica); >+ } >+ slapi_log_error( SLAPI_LOG_REPL, repl_plugin_name, "Abort cleanAllRUV task: unable to create abort " >+ "thread. Aborting task.\n"); >+ slapi_ch_free_string(&data->repl_root); >+ slapi_ch_free_string(&data->certify); >+ rc = LDAP_OPERATIONS_ERROR; >+ } >+ >+out: >+ slapi_ch_free_string(&payload); >+ >+ return rc; >+} >+/* >+ * Process the REPL_CLEANRUV_OID extended operation. >+ * >+ * The payload consists of the replica ID, repl root dn, and the maxcsn. Since this is >+ * basically a replication operation, it could of originated here and bounced >+ * back from another master. So check the rid against the "cleaned_rid". If >+ * it's a match, then we were already here, and we can just return success. >+ * >+ * Otherwise, we the set the cleaned_rid from the payload, fire off extended ops >+ * to all the replica agreements on this replica. Then perform the actual >+ * cleanruv_task on this replica. >+ */ >+int >+multimaster_extop_cleanruv(Slapi_PBlock *pb) >+{ >+ multimaster_mtnode_extension *mtnode_ext; >+ PRThread *thread = NULL; >+ Replica *r = NULL; >+ cleanruv_data *data = NULL; >+ CSN *maxcsn = NULL; >+ struct berval *extop_payload; >+ struct berval *resp_bval = NULL; >+ BerElement *resp_bere = NULL; >+ char *extop_oid; >+ char *repl_root; >+ char *payload = NULL; >+ char *csnstr = NULL; >+ char *iter; >+ int release_it = 0; >+ int rid = 0; >+ int rc = 0; >+ >+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid); >+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_payload); >+ >+ if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_OID) != 0 || >+ NULL == extop_payload || NULL == extop_payload->bv_val){ >+ /* something is wrong, error out */ >+ rc = -1; >+ goto free_and_return; >+ } >+ /* >+ * Decode the payload >+ */ >+ if(decode_cleanruv_payload(extop_payload, &payload)){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to decode payload. Aborting ext op\n"); >+ rc = -1; >+ goto free_and_return; >+ } >+ rid = atoi(ldap_utf8strtok_r(payload, ":", &iter)); >+ repl_root = ldap_utf8strtok_r(iter, ":", &iter); >+ csnstr = ldap_utf8strtok_r(iter, ":", &iter); >+ maxcsn = csn_new(); >+ csn_init_by_string(maxcsn, csnstr); >+ /* >+ * If we already cleaned this server, just return success >+ */ >+ if(is_cleaned_rid(rid)){ >+ csn_free(&maxcsn); >+ rc = 1; >+ goto free_and_return; >+ } >+ >+ /* >+ * Get the node, so we can get the replica and its agreements >+ */ >+ if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to get replication node " >+ "from (%s), aborting operation\n", repl_root); >+ rc = -1; >+ goto free_and_return; >+ } >+ >+ if (mtnode_ext->replica){ >+ object_acquire (mtnode_ext->replica); >+ release_it = 1; >+ } >+ if (mtnode_ext->replica == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: replica is missing from (%s), " >+ "aborting operation\n",repl_root); >+ rc = LDAP_OPERATIONS_ERROR; >+ goto free_and_return; >+ } >+ >+ r = (Replica*)object_get_data (mtnode_ext->replica); >+ if(r == NULL){ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: replica is NULL, aborting task\n"); >+ rc = -1; >+ goto free_and_return; >+ } >+ >+ if(replica_get_type(r) != REPLICA_TYPE_READONLY){ >+ /* >+ * Launch the cleanruv monitoring thread. Once all the replicas are cleaned it will release the rid >+ * >+ * This will also release mtnode_ext->replica >+ */ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: launching cleanAllRUV thread...\n"); >+ data = (cleanruv_data*)slapi_ch_calloc(1, sizeof(cleanruv_data)); >+ if (data == NULL) { >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to allocate " >+ "cleanruv_Data\n"); >+ rc = -1; >+ goto free_and_return; >+ } >+ data->repl_obj = mtnode_ext->replica; >+ data->replica = r; >+ data->rid = rid; >+ data->task = NULL; >+ data->maxcsn = maxcsn; >+ data->payload = slapi_ch_bvdup(extop_payload); >+ >+ thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread_ext, >+ (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, >+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE); >+ if (thread == NULL) { >+ rc = -1; >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: unable to create cleanAllRUV " >+ "monitoring thread. Aborting task.\n"); >+ } >+ } else { /* this is a read-only consumer */ >+ /* >+ * wait for the maxcsn to be covered >+ */ >+ Object *ruv_obj; >+ const RUV *ruv; >+ >+ ruv_obj = replica_get_ruv(r); >+ ruv = object_get_data (ruv_obj); >+ >+ while(!is_task_aborted(rid) && !slapi_is_shutting_down()){ >+ if(!ruv_contains_replica(ruv, rid)){ >+ /* we've already been cleaned */ >+ break; >+ } >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: checking if we're caught up...\n"); >+ if(ruv_covers_csn_cleanallruv(ruv,maxcsn) || csn_get_replicaid(maxcsn) == 0){ >+ /* We are caught up */ >+ break; >+ } else { >+ char csnstr[CSN_STRSIZE]; >+ csn_as_string(maxcsn, PR_FALSE, csnstr); >+ slapi_log_error( SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: not ruv caught up maxcsn(%s)\n", csnstr); >+ } >+ DS_Sleep(PR_SecondsToInterval(5)); >+ } >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: we're caught up...\n"); >+ /* >+ * Set cleaned rid in memory only - does not survive a server restart >+ */ >+ set_cleaned_rid(rid); >+ /* >+ * Clean the ruv >+ */ >+ replica_execute_cleanruv_task_ext(mtnode_ext->replica, rid); >+ >+ /* free everything */ >+ object_release(ruv_obj); >+ csn_free(&maxcsn); >+ if (mtnode_ext->replica && release_it) >+ object_release (mtnode_ext->replica); >+ /* >+ * This read-only replica has no easy way to tell when it's safe to release the rid. >+ * So we won't release it, not until a server restart. >+ */ >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: You must restart the server if you want to reuse rid(%d).\n", rid); >+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: Successfully cleaned rid(%d).\n", rid); >+ } >+ >+free_and_return: >+ if(rc && release_it){ >+ if (mtnode_ext->replica) >+ object_release (mtnode_ext->replica); >+ } >+ if(rc) >+ csn_free(&maxcsn); >+ slapi_ch_free_string(&payload); >+ >+ /* >+ * Craft a message so we know this replica supports the task >+ */ >+ if ((resp_bere = der_alloc())){ >+ >+ ber_int_t response = 1; >+ >+ ber_printf(resp_bere, "{e}", response); >+ ber_flatten(resp_bere, &resp_bval); >+ slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval); >+ slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL); >+ /* resp_bere */ >+ if (NULL != resp_bere) >+ { >+ ber_free(resp_bere, 1); >+ } >+ /* resp_bval */ >+ if (NULL != resp_bval) >+ { >+ ber_bvfree(resp_bval); >+ } >+ } >+ >+ return rc; >+} >+ >+/* > * This plugin entry point is a noop entry > * point. It's used when registering extops that > * are only used as responses. We'll never receive >diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c >index f0aea12..85b0ca9 100644 >--- a/ldap/servers/plugins/replication/repl_globals.c >+++ b/ldap/servers/plugins/replication/repl_globals.c >@@ -110,6 +110,8 @@ const char *type_replicaChangeCount = "nsds5ReplicaChangeCount"; > const char *type_replicaTombstonePurgeInterval = "nsds5ReplicaTombstonePurgeInterval"; > const char *type_replicaLegacyConsumer = "nsds5ReplicaLegacyConsumer"; > const char *type_ruvElementUpdatetime = "nsruvReplicaLastModified"; >+const char *type_replicaCleanRUV = "nsds5ReplicaCleanRUV"; >+const char *type_replicaAbortCleanRUV = "nsds5ReplicaAbortCleanRUV"; > > /* Attribute names for replication agreement attributes */ > const char *type_nsds5ReplicaHost = "nsds5ReplicaHost"; >@@ -126,6 +128,7 @@ const char *type_nsds5ReplicaInitialize = "nsds5BeginReplicaRefresh"; > const char *type_nsds5ReplicaTimeout = "nsds5ReplicaTimeout"; > const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime"; > const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime"; >+const char *type_nsds5ReplicaCleanRUVnotified = "nsds5ReplicaCleanRUVNotified"; > > /* windows sync specific attributes */ > const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree"; >@@ -137,7 +140,7 @@ const char *type_nsds7DirsyncCookie = "nsds7DirsyncCookie"; > const char *type_winSyncInterval = "winSyncInterval"; > const char *type_oneWaySync = "oneWaySync"; > >-/* To Allow Consumer Initialisation when adding an agreement - */ >+/* To Allow Consumer Initialization when adding an agreement - */ > const char *type_nsds5BeginReplicaRefresh = "nsds5BeginReplicaRefresh"; > > static int repl_active_threads; >diff --git a/ldap/servers/slapd/log.c b/ldap/servers/slapd/log.c >index ebec254..529dc80 100644 >--- a/ldap/servers/slapd/log.c >+++ b/ldap/servers/slapd/log.c >@@ -1969,6 +1969,33 @@ slapi_log_error( int severity, char *subsystem, char *fmt, ... ) > } > > int >+slapi_log_error_ext(int severity, char *subsystem, char *fmt, va_list varg1, va_list varg2) >+{ >+ int rc = 0; >+ >+ if ( severity < SLAPI_LOG_MIN || severity > SLAPI_LOG_MAX ) { >+ (void)slapd_log_error_proc( subsystem, "slapi_log_error: invalid severity %d (message %s)\n", >+ severity, fmt ); >+ return( -1 ); >+ } >+ >+ if ( >+ #ifdef _WIN32 >+ *module_ldap_debug >+ #else >+ slapd_ldap_debug >+ #endif >+ & slapi_log_map[ severity ] ) >+ { >+ rc = slapd_log_error_proc_internal( subsystem, fmt, varg1, varg2 ); >+ } else { >+ rc = 0; /* nothing to be logged --> always return success */ >+ } >+ >+ return( rc ); >+} >+ >+int > slapi_is_loglevel_set ( const int loglevel ) > { > >diff --git a/ldap/servers/slapd/slapi-plugin.h b/ldap/servers/slapd/slapi-plugin.h >index 36683a4..e5adc17 100644 >--- a/ldap/servers/slapd/slapi-plugin.h >+++ b/ldap/servers/slapd/slapi-plugin.h >@@ -5519,6 +5519,7 @@ int slapi_log_error( int severity, char *subsystem, char *fmt, ... ) > #else > ; > #endif >+int slapi_log_error_ext( int severity, char *subsystem, char *fmt, va_list varg1, va_list varg2); > > /* allowed values for the "severity" parameter */ > #define SLAPI_LOG_FATAL 0 >@@ -6029,6 +6030,8 @@ void slapi_task_log_notice(Slapi_Task *task, char *format, ...) > #else > ; > #endif >+void slapi_task_log_status_ext(Slapi_Task *task, char *format, va_list varg); >+void slapi_task_log_notice_ext(Slapi_Task *task, char *format, va_list varg); > > /* > * slapi_new_task: create new task, fill in DN, and setup modify callback >diff --git a/ldap/servers/slapd/task.c b/ldap/servers/slapd/task.c >index 6d60775..5fc0b08 100644 >--- a/ldap/servers/slapd/task.c >+++ b/ldap/servers/slapd/task.c >@@ -215,6 +215,17 @@ void slapi_task_log_status(Slapi_Task *task, char *format, ...) > slapi_task_status_changed(task); > } > >+void slapi_task_log_status_ext(Slapi_Task *task, char *format, va_list ap) >+{ >+ if (! task->task_status) >+ task->task_status = (char *)slapi_ch_malloc(10 * LOG_BUFFER); >+ if (! task->task_status) >+ return; /* out of memory? */ >+ >+ PR_vsnprintf(task->task_status, (10 * LOG_BUFFER), format, ap); >+ slapi_task_status_changed(task); >+} >+ > /* this adds a line to the 'nsTaskLog' value, which is cumulative (anything > * logged here is added to the end) > */ >@@ -266,6 +277,51 @@ void slapi_task_log_notice(Slapi_Task *task, char *format, ...) > slapi_task_status_changed(task); > } > >+void slapi_task_log_notice_ext(Slapi_Task *task, char *format, va_list ap) >+{ >+ char buffer[LOG_BUFFER]; >+ size_t len; >+ >+ PR_vsnprintf(buffer, LOG_BUFFER, format, ap); >+ >+ if (task->task_log_lock) { >+ PR_Lock(task->task_log_lock); >+ } >+ len = 2 + strlen(buffer) + (task->task_log ? strlen(task->task_log) : 0); >+ if ((len > MAX_SCROLLBACK_BUFFER) && task->task_log) { >+ size_t i; >+ char *newbuf; >+ >+ /* start from middle of buffer, and find next linefeed */ >+ i = strlen(task->task_log)/2; >+ while (task->task_log[i] && (task->task_log[i] != '\n')) >+ i++; >+ if (task->task_log[i]) >+ i++; >+ len = strlen(task->task_log) - i + 2 + strlen(buffer); >+ newbuf = (char *)slapi_ch_malloc(len); >+ strcpy(newbuf, task->task_log + i); >+ slapi_ch_free((void **)&task->task_log); >+ task->task_log = newbuf; >+ } else { >+ if (! task->task_log) { >+ task->task_log = (char *)slapi_ch_malloc(len); >+ task->task_log[0] = 0; >+ } else { >+ task->task_log = (char *)slapi_ch_realloc(task->task_log, len); >+ } >+ } >+ >+ if (task->task_log[0]) >+ strcat(task->task_log, "\n"); >+ strcat(task->task_log, buffer); >+ if (task->task_log_lock) { >+ PR_Unlock(task->task_log_lock); >+ } >+ >+ slapi_task_status_changed(task); >+} >+ > /* update attributes in the entry under "cn=tasks" to match the current > * status of the task. */ > void slapi_task_status_changed(Slapi_Task *task) >-- >1.7.1 >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 863505
: 622397