From b616a0557628731386cdd2e18a6864a65936f5be Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 11:54:08 +0100 Subject: [PATCH 01/14] Add sync-with-meta.py tool Tool to synchronize a document *and* it's metadata between two clusters. Example usage: sync-with-meta.py --verbose 192.168.10.1 192.168.10.2 key_to_sync key_to_sync2 Add --force to overwrite existing documents on the destination. Change-Id: I61dd0e825baf029373f64af3993973926e8a95a7 --- management/mc_bin_client.py | 4 +- management/sync-with-meta.py | 155 +++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) create mode 100755 management/sync-with-meta.py diff --git a/management/mc_bin_client.py b/management/mc_bin_client.py index 62fb07eab..a38632969 100644 --- a/management/mc_bin_client.py +++ b/management/mc_bin_client.py @@ -149,10 +149,10 @@ def set(self, key, exp, flags, val): """Set a value in the memcached server.""" return self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 0, val) - def setWithMeta(self, key, value, exp, flags, seqno, remote_cas): + def setWithMeta(self, key, value, exp, flags, seqno, remote_cas, cas=0): """Set a value and its meta data in the memcached server.""" return self._doMetaCmd(memcacheConstants.CMD_SET_WITH_META, - key, value, 0, exp, flags, seqno, remote_cas) + key, value, cas, exp, flags, seqno, remote_cas) def add(self, key, exp, flags, val): """Add a value in the memcached server iff it doesn't already exist.""" diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py new file mode 100755 index 000000000..56a9e78c6 --- /dev/null +++ b/management/sync-with-meta.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +# Tool which synchonises a given document between two clusters, including metadata. +# Given references to two clusters, if the document is *only* found on one +# of the two it is copied to the other. + +# If it exists on both clusters, then it will refuse to copy them. + +from __future__ import print_function + +import memcacheConstants +from mc_bin_client import MemcachedClient +from mc_bin_client import MemcachedError + +import sys +import optparse + + +def get_matching_meta(cluster, key, attempts): + """Attempt to get a matching value and metadata for a key (using get() and + getMeta(). Will retry up to attempts times. Returns a tuple of the fields + on success, else None.""" + for _ in range(attempts): + (_, cas, value) = cluster.get(key) + (deleted, flags, exp, seqno, meta_cas) = cluster.getMeta(key) + if cas == meta_cas: + break + else: + # Failed + return None + return (deleted, flags, exp, seqno, cas, value) + + +def synchronize_key(src, dest, key): + """Reads a document+metadata from the source; then attempts to set the same + doc+meta on the destination.""" + + global options + print("Key: {}".format(key)) + + try: + result = get_matching_meta(src, key, 3) + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_ENOENT: + print(" Error: no such key '{}' on souce - skipping.".format(key)) + return + else: + raise + + if not result: + print((" Error: failed to get consistant data & " + + "metadata from source - skipping.").format(key)) + return + (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) = result + + if options.verbose: + print((" Source : deleted:{0} flags:{1} exp:{2} " + + "seqNo:{3} CAS:{4} value:{5}...").format(s_deleted, s_flags, s_exp, + s_seqno, s_cas, s_value[:30])) + + result = None + try: + result = get_matching_meta(dest, key, 3) + except MemcachedError as e: + if e.status != memcacheConstants.ERR_KEY_ENOENT: + raise + + if result: + (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value) = result + if options.verbose: + print((" Dest before sync: deleted:{0} flags:{1} exp:{2} " + + "seqNo:{3} CAS:{4} value:{5}...").format(d_deleted, d_flags, + d_exp, d_seqno, d_cas, d_value[:30])) + + if (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) == (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value): + print(" Source and Destination match exactly - skipping.") + return + + if result and options.force: + try: + dest.setWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas, + d_cas) + + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_EEXISTS: + # CAS mismatch + print("Error: CAS mismatch setting at destination.") + else: + try: + dest.addWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas) + + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_EEXISTS: + print(("Error: key '{}' already exists on destination " + + "cluster. Run with --force to overwrite.").format(key)) + else: + raise + + # Fetch to double-check it matches: + result = get_matching_meta(dest, key, 3) + if not result: + print(("Error: failed to get consistant data & metadata from " + + "destination after set.").format(key)) + return + (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value) = result + + same = ((s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) == (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value)) + if same: + print(" OK") + else: + print("ERROR: Src & dest differ *after* setWithMeta:") + + if not same or options.verbose: + print((" Dest after sync: deleted:{0} flags:{1} " + + "exp:{2} seqNo:{3} CAS:{4}").format(d_deleted, d_flags, d_exp, + d_seqno, d_cas)) + + +def main(args): + parser = optparse.OptionParser() + parser.add_option('-b','--bucket', dest="bucket", default="default", + help="bucket to use") + parser.add_option('-f', '--force', action='store_true', dest='force', + help='Overwrite destination document if it already exists.') + parser.add_option('-v', '--verbose', action='store_true', dest='verbose', + help='Verbose') + + global options + options, args = parser.parse_args() + + bucket = options.bucket + password = "" + + if len(args) < 3: + print("Usage: sync-doc ") + exit(1) + + src_port = dest_port = 11211 + src_name = args.pop(0) + dest_name = args.pop(0) + if ':' in src_name: + (src_name, src_port) = src_name.split(':') + if ':' in dest_name: + (dest_name, dest_port) = dest_name.split(':') + + src = MemcachedClient(src_name, int(src_port)) + dest = MemcachedClient(dest_name, int(dest_port)) + src.sasl_auth_plain(bucket, password) + dest.sasl_auth_plain(bucket, password) + + for key in args: + synchronize_key(src, dest, key) + +if __name__ == '__main__': + sys.exit(main(sys.argv)) From f4a8a31a3f8397d5a4865e324d781630bdcb82ac Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 12:04:19 +0100 Subject: [PATCH 02/14] sync-with-meta: support different src/dest buckets Change-Id: Ic7ac39164f7d2d57476a1a459cc1ab9f58ca1a9f --- management/sync-with-meta.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index 56a9e78c6..9bd54b0f0 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -118,8 +118,10 @@ def synchronize_key(src, dest, key): def main(args): parser = optparse.OptionParser() - parser.add_option('-b','--bucket', dest="bucket", default="default", - help="bucket to use") + parser.add_option('-s','--source-bucket', dest="src_bucket", default="default", + help="source bucket to use") + parser.add_option('-d','--dest-bucket', dest="dest_bucket", default="default", + help="destination bucket to use") parser.add_option('-f', '--force', action='store_true', dest='force', help='Overwrite destination document if it already exists.') parser.add_option('-v', '--verbose', action='store_true', dest='verbose', @@ -128,7 +130,6 @@ def main(args): global options options, args = parser.parse_args() - bucket = options.bucket password = "" if len(args) < 3: @@ -145,8 +146,8 @@ def main(args): src = MemcachedClient(src_name, int(src_port)) dest = MemcachedClient(dest_name, int(dest_port)) - src.sasl_auth_plain(bucket, password) - dest.sasl_auth_plain(bucket, password) + src.sasl_auth_plain(options.src_bucket, password) + dest.sasl_auth_plain(options.dest_bucket, password) for key in args: synchronize_key(src, dest, key) From 60ae1e6949b1eda5a5fe87ea877f13da5a345736 Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 12:11:47 +0100 Subject: [PATCH 03/14] sync-with-meta: Add check for revIDs being greater on dest. Change-Id: I69e1b91dbd0f022336c03ab67f032e4b5648540a --- management/sync-with-meta.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index 9bd54b0f0..cb9dc64db 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -78,13 +78,20 @@ def synchronize_key(src, dest, key): if result and options.force: try: + # Check revIDs are increasing. + if d_seqno > s_seqno: + print(("Error: Destination revID '{}' greater than source " + + "revID '{}'. Cannot synchronize.").format(d_seqno, + s_seqno)) + return + dest.setWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas, d_cas) except MemcachedError as e: if e.status == memcacheConstants.ERR_KEY_EEXISTS: - # CAS mismatch - print("Error: CAS mismatch setting at destination.") + print("Error: Got EEXISTS during setWithMeta(). Possible " + + "CAS mismatch setting at destination.") else: try: dest.addWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas) From e121049d866bad82b1070a1f9dbd589008757a97 Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 12:33:38 +0100 Subject: [PATCH 04/14] sync-with-meta: Allow updating of source revID if too low Requires new option --allow-source-changes Change-Id: Icad7802f0da05120fbf3dc9052c4bcfc1b507520 --- management/sync-with-meta.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index cb9dc64db..a6848ac34 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -77,14 +77,32 @@ def synchronize_key(src, dest, key): return if result and options.force: - try: - # Check revIDs are increasing. - if d_seqno > s_seqno: + # Check revIDs are increasing. + if d_seqno > s_seqno: + if options.allow_src_changes: + # We are allowed to change source, so fix this by bumping + # up the source's to dest_revID+1. + src.setWithMeta(key, s_value, s_exp, s_flags, d_seqno + 1, + s_cas, s_cas) + # Refetch CAS, etc from new document. + result = get_matching_meta(src, key, 3) + if not result: + print((" Error: failed to get consistant data & " + + "metadata from source - skipping.").format(key)) + return + (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) = result + if options.verbose: + print((" Source after revID fix : deleted:{0} flags:{1} " + + "exp:{2} seqNo:{3} CAS:{4} value:{5}...").format( + s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value[:30])) + + else: print(("Error: Destination revID '{}' greater than source " + - "revID '{}'. Cannot synchronize.").format(d_seqno, - s_seqno)) + "revID '{}'. Cannot synchronize unless " + + "--allow-source-changes is enabled.").format(d_seqno, + s_seqno)) return - + try: dest.setWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas, d_cas) @@ -131,6 +149,9 @@ def main(args): help="destination bucket to use") parser.add_option('-f', '--force', action='store_true', dest='force', help='Overwrite destination document if it already exists.') + parser.add_option('-a', '--allow-source-changes', action='store_true', dest='allow_src_changes', + help=('Allow changes to the source metadata ' + + '(e.g. revID) to be made if necessary to synchronize documents.')) parser.add_option('-v', '--verbose', action='store_true', dest='verbose', help='Verbose') From f6237f443776efa8c0ecb9da77da551daaca046b Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 12:49:31 +0100 Subject: [PATCH 05/14] sync-with-meta: Delete on dest if deleted on src Change-Id: I66c7153077a7c0e3b3eabe3082ba30ce923cfff7 --- management/sync-with-meta.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index a6848ac34..55078aa43 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -21,14 +21,18 @@ def get_matching_meta(cluster, key, attempts): getMeta(). Will retry up to attempts times. Returns a tuple of the fields on success, else None.""" for _ in range(attempts): - (_, cas, value) = cluster.get(key) (deleted, flags, exp, seqno, meta_cas) = cluster.getMeta(key) + if deleted: + value = "" + break + (_, cas, value) = cluster.get(key) if cas == meta_cas: break else: # Failed return None - return (deleted, flags, exp, seqno, cas, value) + return (deleted, flags, exp, seqno, meta_cas, value) + def synchronize_key(src, dest, key): From 7c61e62ca9c3c3d32198afbcce9bcac3460418a0 Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 12:49:49 +0100 Subject: [PATCH 06/14] sync-with-meta: Simplify printing of docs Change-Id: Iefe365ef453760b04e06b894f25001fcd3fe8217 --- management/sync-with-meta.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index 55078aa43..4e952f170 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -34,6 +34,12 @@ def get_matching_meta(cluster, key, attempts): return (deleted, flags, exp, seqno, meta_cas, value) +def print_doc(title, deleted, flags, exp, seqno, cas, value): + if deleted: + value = "DELETED" + print((" {0:20} : deleted:{1} flags:{2} exp:{3} seqNo:{4} CAS:{5} " + + "value:{6}...").format(title, deleted, flags, exp, seqno, cas, + value[:30])) def synchronize_key(src, dest, key): """Reads a document+metadata from the source; then attempts to set the same @@ -58,9 +64,8 @@ def synchronize_key(src, dest, key): (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) = result if options.verbose: - print((" Source : deleted:{0} flags:{1} exp:{2} " + - "seqNo:{3} CAS:{4} value:{5}...").format(s_deleted, s_flags, s_exp, - s_seqno, s_cas, s_value[:30])) + print_doc("Source", s_deleted, s_flags, s_exp, s_seqno, s_cas, + s_value) result = None try: @@ -72,9 +77,8 @@ def synchronize_key(src, dest, key): if result: (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value) = result if options.verbose: - print((" Dest before sync: deleted:{0} flags:{1} exp:{2} " + - "seqNo:{3} CAS:{4} value:{5}...").format(d_deleted, d_flags, - d_exp, d_seqno, d_cas, d_value[:30])) + print_doc("Dest before sync", d_deleted, d_flags, d_exp, d_seqno, + d_cas, d_value) if (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) == (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value): print(" Source and Destination match exactly - skipping.") @@ -96,9 +100,8 @@ def synchronize_key(src, dest, key): return (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) = result if options.verbose: - print((" Source after revID fix : deleted:{0} flags:{1} " + - "exp:{2} seqNo:{3} CAS:{4} value:{5}...").format( - s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value[:30])) + print_doc("Source after revID fix", s_deleted, s_flags, + s_exp, s_seqno, s_cas, s_value) else: print(("Error: Destination revID '{}' greater than source " + @@ -140,9 +143,8 @@ def synchronize_key(src, dest, key): print("ERROR: Src & dest differ *after* setWithMeta:") if not same or options.verbose: - print((" Dest after sync: deleted:{0} flags:{1} " + - "exp:{2} seqNo:{3} CAS:{4}").format(d_deleted, d_flags, d_exp, - d_seqno, d_cas)) + print_doc("Dest after sync", d_deleted, d_flags, d_exp, d_seqno, d_cas, + d_value) def main(args): From 45552f804188787d9f8e7235d26c9d41c7d0f74a Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 13:51:29 +0100 Subject: [PATCH 07/14] sync-with-meta: Rename --force to --overwrite Change-Id: Ic239cb3a7be486bf56707b7f0ed2b5be27cec8c3 --- management/sync-with-meta.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index 4e952f170..be4e6f962 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -84,7 +84,7 @@ def synchronize_key(src, dest, key): print(" Source and Destination match exactly - skipping.") return - if result and options.force: + if result and options.overwrite: # Check revIDs are increasing. if d_seqno > s_seqno: if options.allow_src_changes: @@ -124,7 +124,7 @@ def synchronize_key(src, dest, key): except MemcachedError as e: if e.status == memcacheConstants.ERR_KEY_EEXISTS: print(("Error: key '{}' already exists on destination " + - "cluster. Run with --force to overwrite.").format(key)) + "cluster. Run with --overwrite to overwrite.").format(key)) else: raise @@ -153,7 +153,7 @@ def main(args): help="source bucket to use") parser.add_option('-d','--dest-bucket', dest="dest_bucket", default="default", help="destination bucket to use") - parser.add_option('-f', '--force', action='store_true', dest='force', + parser.add_option('-f', '--overwrite', action='store_true', dest='overwrite', help='Overwrite destination document if it already exists.') parser.add_option('-a', '--allow-source-changes', action='store_true', dest='allow_src_changes', help=('Allow changes to the source metadata ' + From 34b8e7eb589f60d67257fd9d3bab43fcb01e75f7 Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 14:57:08 +0100 Subject: [PATCH 08/14] sync-with-meta: Handle document deletion. cleanup / refactor Change-Id: I269fd2a21c04b58d58427b34a59a6c73b51b8c56 --- management/sync-with-meta.py | 202 ++++++++++++++++++++--------------- 1 file changed, 117 insertions(+), 85 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index be4e6f962..af202aeb0 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -16,30 +16,66 @@ import optparse +class Document: + def __init__(self, fields): + self.fields = fields + def __eq__(self, other): + return self.fields == other.fields + def deleted(self): + return self.fields[0] + def flags(self): + return self.fields[1] + def exp(self): + return self.fields[2] + def seqno(self): + return self.fields[3] + def cas(self): + return self.fields[4] + def value(self): + return self.fields[5] + + def get_matching_meta(cluster, key, attempts): """Attempt to get a matching value and metadata for a key (using get() and getMeta(). Will retry up to attempts times. Returns a tuple of the fields on success, else None.""" - for _ in range(attempts): - (deleted, flags, exp, seqno, meta_cas) = cluster.getMeta(key) - if deleted: - value = "" - break - (_, cas, value) = cluster.get(key) - if cas == meta_cas: - break - else: - # Failed - return None - return (deleted, flags, exp, seqno, meta_cas, value) + try: + for _ in range(attempts): + (deleted, flags, exp, seqno, meta_cas) = cluster.getMeta(key) + if deleted: + value = "" + break + (_, cas, value) = cluster.get(key) + if cas == meta_cas: + break + else: + # Failed + return ("EINCONSISTANT", None) + return (None, Document((deleted, flags, exp, seqno, meta_cas, value))) + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_ENOENT: + return ("ENOENT", None) + else: + raise -def print_doc(title, deleted, flags, exp, seqno, cas, value): - if deleted: +def print_doc(title, d): + if d.deleted() == 1: value = "DELETED" - print((" {0:20} : deleted:{1} flags:{2} exp:{3} seqNo:{4} CAS:{5} " + - "value:{6}...").format(title, deleted, flags, exp, seqno, cas, - value[:30])) + else: + value = d.value() + print((" {0:25} : deleted:{1} flags:{2} exp:{3} seqNo:{4} CAS:{5} " + + "value:{6}...").format(title, d.deleted(), d.flags(), d.exp(), + d.seqno(), d.cas(), value[:30])) + + +def docs_equal(src_err, src_doc, dest_err, dest_doc): + """Returns true if the given src & dest docs should be considered equal.""" + if not src_err: + return src_doc == dest_doc + else: + return src_err == "ENOENT" and dest_doc.deleted() == 1 + def synchronize_key(src, dest, key): """Reads a document+metadata from the source; then attempts to set the same @@ -48,79 +84,78 @@ def synchronize_key(src, dest, key): global options print("Key: {}".format(key)) - try: - result = get_matching_meta(src, key, 3) - except MemcachedError as e: - if e.status == memcacheConstants.ERR_KEY_ENOENT: - print(" Error: no such key '{}' on souce - skipping.".format(key)) + (src_err, src_doc) = get_matching_meta(src, key, 3) + if src_err: + if src_err == "EINCONSISTANT": + print((" Error: failed to get consistant data & metadata from " + + "source - skipping.").format(key)) return + elif src_err == "ENOENT": + if not options.delete_if_missing: + print(" Error: no such key '{}' on souce - skipping.".format(key)) + return else: raise - if not result: - print((" Error: failed to get consistant data & " + - "metadata from source - skipping.").format(key)) - return - (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) = result - - if options.verbose: - print_doc("Source", s_deleted, s_flags, s_exp, s_seqno, s_cas, - s_value) - - result = None - try: - result = get_matching_meta(dest, key, 3) - except MemcachedError as e: - if e.status != memcacheConstants.ERR_KEY_ENOENT: - raise + if src_doc: + if options.verbose: + print_doc("Source", src_doc) + else: + if options.verbose: + print(" Source : missing") - if result: - (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value) = result + (dest_err, dest_doc) = get_matching_meta(dest, key, 3) + if not dest_err: if options.verbose: - print_doc("Dest before sync", d_deleted, d_flags, d_exp, d_seqno, - d_cas, d_value) + print_doc("Dest before sync", dest_doc) - if (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) == (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value): - print(" Source and Destination match exactly - skipping.") + if docs_equal(src_err, src_doc, dest_err, dest_doc): + print(" Source and Destination match - skipping.") return - if result and options.overwrite: + if dest_doc and options.overwrite: # Check revIDs are increasing. - if d_seqno > s_seqno: - if options.allow_src_changes: - # We are allowed to change source, so fix this by bumping - # up the source's to dest_revID+1. - src.setWithMeta(key, s_value, s_exp, s_flags, d_seqno + 1, - s_cas, s_cas) - # Refetch CAS, etc from new document. - result = get_matching_meta(src, key, 3) - if not result: - print((" Error: failed to get consistant data & " + + if src_doc: + if dest_doc.seqno() >= src_doc.seqno(): + if options.allow_src_changes: + # We are allowed to change source, so fix this by bumping + # up the source's to dest_revID+1. + src.setWithMeta(key, src_doc.value(), src_doc.exp(), + src_doc.flags(), dest_doc.seqno() + 1, + src_doc.cas(), src_doc.cas()) + # Refetch CAS, etc from new document. + (src_err, src_doc) = get_matching_meta(src, key, 3) + if not src_doc: + print((" Error: failed to get consistant data & " + "metadata from source - skipping.").format(key)) + return + if options.verbose: + print_doc("Source after revID fix", src_doc) + + else: + print(("Error: Destination revID '{}' greater than source " + + "revID '{}'. Cannot synchronize unless " + + "--allow-source-changes is enabled.").format( + dest_doc.seqno(), src_doc.seqno())) return - (s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) = result - if options.verbose: - print_doc("Source after revID fix", s_deleted, s_flags, - s_exp, s_seqno, s_cas, s_value) + try: + dest.setWithMeta(key, src_doc.value(), src_doc.exp(), + src_doc.flags(), src_doc.seqno(), + src_doc.cas(), dest_doc.cas()) - else: - print(("Error: Destination revID '{}' greater than source " + - "revID '{}'. Cannot synchronize unless " + - "--allow-source-changes is enabled.").format(d_seqno, - s_seqno)) - return - try: - dest.setWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas, - d_cas) + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_EEXISTS: + print("Error: Got EEXISTS during setWithMeta(). Possible " + + "CAS mismatch setting at destination.") + + else: # No source document - just delete destination. + dest.delete(key) - except MemcachedError as e: - if e.status == memcacheConstants.ERR_KEY_EEXISTS: - print("Error: Got EEXISTS during setWithMeta(). Possible " + - "CAS mismatch setting at destination.") else: + # Doesn't exist yet - use addWithMeta. try: - dest.addWithMeta(key, s_value, s_exp, s_flags, s_seqno, s_cas) - + dest.addWithMeta(key, src_doc.value(), src_doc.exp(), + src_doc.flags(), src_doc.seqno(), src_doc.cas()) except MemcachedError as e: if e.status == memcacheConstants.ERR_KEY_EEXISTS: print(("Error: key '{}' already exists on destination " + @@ -129,22 +164,16 @@ def synchronize_key(src, dest, key): raise # Fetch to double-check it matches: - result = get_matching_meta(dest, key, 3) - if not result: - print(("Error: failed to get consistant data & metadata from " + - "destination after set.").format(key)) - return - (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value) = result - - same = ((s_deleted, s_flags, s_exp, s_seqno, s_cas, s_value) == (d_deleted, d_flags, d_exp, d_seqno, d_cas, d_value)) + (dest_err, dest_doc) = get_matching_meta(dest, key, 3) + + same = docs_equal(src_err, src_doc, dest_err, dest_doc) if same: print(" OK") else: print("ERROR: Src & dest differ *after* setWithMeta:") if not same or options.verbose: - print_doc("Dest after sync", d_deleted, d_flags, d_exp, d_seqno, d_cas, - d_value) + print_doc("Dest after sync", dest_doc) def main(args): @@ -153,11 +182,14 @@ def main(args): help="source bucket to use") parser.add_option('-d','--dest-bucket', dest="dest_bucket", default="default", help="destination bucket to use") - parser.add_option('-f', '--overwrite', action='store_true', dest='overwrite', + parser.add_option('-o', '--overwrite', action='store_true', dest='overwrite', help='Overwrite destination document if it already exists.') parser.add_option('-a', '--allow-source-changes', action='store_true', dest='allow_src_changes', help=('Allow changes to the source metadata ' + '(e.g. revID) to be made if necessary to synchronize documents.')) + parser.add_option('-D', '--delete-if-missing', action='store_true', + dest='delete_if_missing', help='Delete document from destingation if ' + + 'it doesn\'t exist (and no tombstone present) on the source.') parser.add_option('-v', '--verbose', action='store_true', dest='verbose', help='Verbose') From 614783635f4c47c4da9c306b2099c7dee8b9179c Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 15:16:21 +0100 Subject: [PATCH 09/14] sync-with-meta: Handle document not existing on dest or src Change-Id: Ibac54d0dfcdac37e1c98d901abae5cc1ed6db078 --- management/sync-with-meta.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index af202aeb0..de7dd4939 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -92,7 +92,9 @@ def synchronize_key(src, dest, key): return elif src_err == "ENOENT": if not options.delete_if_missing: - print(" Error: no such key '{}' on souce - skipping.".format(key)) + print((" Error: no such key '{}' on souce - skipping. If " + + "you want to delete this from destination, run with " + + "--delete-if-missing").format(key)) return else: raise @@ -152,16 +154,17 @@ def synchronize_key(src, dest, key): dest.delete(key) else: - # Doesn't exist yet - use addWithMeta. - try: - dest.addWithMeta(key, src_doc.value(), src_doc.exp(), + if src_doc: + # Doesn't exist yet - use addWithMeta. + try: + dest.addWithMeta(key, src_doc.value(), src_doc.exp(), src_doc.flags(), src_doc.seqno(), src_doc.cas()) - except MemcachedError as e: - if e.status == memcacheConstants.ERR_KEY_EEXISTS: - print(("Error: key '{}' already exists on destination " + - "cluster. Run with --overwrite to overwrite.").format(key)) - else: - raise + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_EEXISTS: + print(("Error: key '{}' already exists on destination " + + "cluster. Run with --overwrite to overwrite.").format(key)) + else: + raise # Fetch to double-check it matches: (dest_err, dest_doc) = get_matching_meta(dest, key, 3) From 80128b1d58fb3fff91f302daf4838b11fc4ddebf Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 15:20:53 +0100 Subject: [PATCH 10/14] sync-with-meta: Handle document not existing on dest or src - take2 Change-Id: I078aade4d556a5d541b9393bf2e8436f57f409dc --- management/sync-with-meta.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index de7dd4939..74008430d 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -116,8 +116,8 @@ def synchronize_key(src, dest, key): return if dest_doc and options.overwrite: - # Check revIDs are increasing. if src_doc: + # Check revIDs are increasing. if dest_doc.seqno() >= src_doc.seqno(): if options.allow_src_changes: # We are allowed to change source, so fix this by bumping @@ -165,6 +165,11 @@ def synchronize_key(src, dest, key): "cluster. Run with --overwrite to overwrite.").format(key)) else: raise + else: + # No source or destination doc - nothing to do. + print(("Error: key '{}' doesn't exist on either source or " + + "destination - ignoring.").format(key)) + return # Fetch to double-check it matches: (dest_err, dest_doc) = get_matching_meta(dest, key, 3) From 13713b17757e6b5d3516e4281e8ee236a05261ca Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 16:50:57 +0100 Subject: [PATCH 11/14] compare-meta.py Change-Id: I001cd5d434b4ba564269d6c68e3d7630375ca5d3 --- management/compare-meta.py | 93 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100755 management/compare-meta.py diff --git a/management/compare-meta.py b/management/compare-meta.py new file mode 100755 index 000000000..1d003326c --- /dev/null +++ b/management/compare-meta.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python + +import optparse +import time + +import memcacheConstants +from mc_bin_client import MemcachedClient +from mc_bin_client import MemcachedError + + +def check_key(src, dest, key): + global options + for _ in range(options.attempts): + src_missing = dest_missing = False + try: + src_doc = src.getMeta(key) + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_ENOENT: + src_missing = True + else: + raise + + try: + dest_doc = dest.getMeta(key) + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_ENOENT: + dest_missing = True + else: + raise + + if src_missing and dest_missing: + # Both missing, treat as a match. + return + + if src_doc == dest_doc: + # Both same, match + return + + # Differences, wait a small time for any XDCR etc to sync up before + # next iteration + time.sleep(0.01) + else: + print("*** Differences found for '{}':".format(key)) + print((" Source: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( + dest_doc[0], dest_doc[1], dest_doc[2], dest_doc[3], dest_doc[4])) + print((" Destination: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( + src_doc[0], src_doc[1], src_doc[2], src_doc[3], src_doc[4])) + + +def main(): + parser = optparse.OptionParser() + parser.add_option('-s','--source-bucket', dest="src_bucket", default="default", + help="source bucket to use") + parser.add_option('-d','--dest-bucket', dest="dest_bucket", default="default", + help="destination bucket to use") + parser.add_option('-v', '--verbose', action='store_true', dest='verbose', + help='Verbose') + parser.add_option('-p', '--prefix', dest='prefix', default='', help='Key prefix') + parser.add_option('-a', '--attempts', dest='attempts', default=3, + type='int', help='number of attempts to make to get matching metadata') + + global options + options, args = parser.parse_args() + + password = "" + + if len(args) < 4: + print("Usage: compare-meta Date: Mon, 16 Jun 2014 17:05:31 +0100 Subject: [PATCH 12/14] compare-meta.py: Add brief mode Change-Id: I1a303d296913012e2a6887fd4d070b8976fb5d2c --- management/compare-meta.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/management/compare-meta.py b/management/compare-meta.py index 1d003326c..ecb81cd3d 100755 --- a/management/compare-meta.py +++ b/management/compare-meta.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +from __future__ import print_function + import optparse import time @@ -40,11 +42,14 @@ def check_key(src, dest, key): # next iteration time.sleep(0.01) else: - print("*** Differences found for '{}':".format(key)) - print((" Source: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( - dest_doc[0], dest_doc[1], dest_doc[2], dest_doc[3], dest_doc[4])) - print((" Destination: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( - src_doc[0], src_doc[1], src_doc[2], src_doc[3], src_doc[4])) + if options.brief: + print('x', end='', sep='') + else: + print("*** Differences found for '{}':".format(key)) + print((" Source: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( + dest_doc[0], dest_doc[1], dest_doc[2], dest_doc[3], dest_doc[4])) + print((" Destination: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( + src_doc[0], src_doc[1], src_doc[2], src_doc[3], src_doc[4])) def main(): @@ -58,6 +63,7 @@ def main(): parser.add_option('-p', '--prefix', dest='prefix', default='', help='Key prefix') parser.add_option('-a', '--attempts', dest='attempts', default=3, type='int', help='number of attempts to make to get matching metadata') + parser.add_option('-b', '--brief', action='store_true', help='Brief output') global options options, args = parser.parse_args() From 9c2aa6c19812a291770471d4ad5bda3479be3b6e Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Mon, 16 Jun 2014 17:09:31 +0100 Subject: [PATCH 13/14] compare-meta.py: Flush brief mode output Change-Id: I9e5f3d2dad67b2bdeeb507e365f355b05e4816f1 --- management/compare-meta.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/management/compare-meta.py b/management/compare-meta.py index ecb81cd3d..bb2b9bcc2 100755 --- a/management/compare-meta.py +++ b/management/compare-meta.py @@ -1,8 +1,7 @@ #!/usr/bin/env python -from __future__ import print_function - import optparse +import sys import time import memcacheConstants @@ -43,13 +42,13 @@ def check_key(src, dest, key): time.sleep(0.01) else: if options.brief: - print('x', end='', sep='') + sys.stdout.write('x') else: - print("*** Differences found for '{}':".format(key)) - print((" Source: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( - dest_doc[0], dest_doc[1], dest_doc[2], dest_doc[3], dest_doc[4])) - print((" Destination: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( - src_doc[0], src_doc[1], src_doc[2], src_doc[3], src_doc[4])) + print "*** Differences found for '{}':".format(key) + print (" Source: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( + dest_doc[0], dest_doc[1], dest_doc[2], dest_doc[3], dest_doc[4]) + print (" Destination: deleted:{} flags:{} exp:{} seqNo:{} CAS:{} ").format( + src_doc[0], src_doc[1], src_doc[2], src_doc[3], src_doc[4]) def main(): @@ -93,7 +92,7 @@ def main(): for n in range(key_min, key_max): key = options.prefix + str(n) check_key(src, dest, key) - + sys.stdout.flush() if __name__ == '__main__': - main() \ No newline at end of file + main() From 5c84a106a615146f8dee78d860a9235f2d8bf70e Mon Sep 17 00:00:00 2001 From: jim Date: Thu, 24 Jul 2014 16:59:32 +0100 Subject: [PATCH 14/14] Tidy up help and use stderr --- management/sync-with-meta.py | 142 +++++++++++++++++++++++------------ 1 file changed, 94 insertions(+), 48 deletions(-) diff --git a/management/sync-with-meta.py b/management/sync-with-meta.py index 74008430d..2751eb7da 100755 --- a/management/sync-with-meta.py +++ b/management/sync-with-meta.py @@ -82,41 +82,44 @@ def synchronize_key(src, dest, key): doc+meta on the destination.""" global options - print("Key: {}".format(key)) (src_err, src_doc) = get_matching_meta(src, key, 3) if src_err: if src_err == "EINCONSISTANT": print((" Error: failed to get consistant data & metadata from " + - "source - skipping.").format(key)) + "source - skipping.").format(key), file=sys.stderr) return elif src_err == "ENOENT": if not options.delete_if_missing: print((" Error: no such key '{}' on souce - skipping. If " + "you want to delete this from destination, run with " + - "--delete-if-missing").format(key)) + "--delete-if-missing").format(key), file=sys.stderr) return else: raise - if src_doc: - if options.verbose: - print_doc("Source", src_doc) - else: - if options.verbose: + if (not src_doc) and options.verbose: print(" Source : missing") (dest_err, dest_doc) = get_matching_meta(dest, key, 3) if not dest_err: - if options.verbose: - print_doc("Dest before sync", dest_doc) if docs_equal(src_err, src_doc, dest_err, dest_doc): - print(" Source and Destination match - skipping.") + if options.verbose: + print(("Key: {} Source and Destination match - " + + "skipping.").format(key)) return + # We've identified that there's a difference to resolve. + # Print the key and begin the resolution code. + print("Key: {}".format(key)) + if dest_doc and options.overwrite: + print_doc("Dest before sync", dest_doc) + if src_doc: + print_doc("Source", src_doc) + changed_source_document=False # Check revIDs are increasing. if dest_doc.seqno() >= src_doc.seqno(): if options.allow_src_changes: @@ -128,33 +131,43 @@ def synchronize_key(src, dest, key): # Refetch CAS, etc from new document. (src_err, src_doc) = get_matching_meta(src, key, 3) if not src_doc: - print((" Error: failed to get consistant data & " + - "metadata from source - skipping.").format(key)) + print(("Error: failed to get consistent data & " + + "metadata from source - skipping.") + .format(key), file=sys.stderr) return - if options.verbose: - print_doc("Source after revID fix", src_doc) + print_doc("Source after revID fix", src_doc) + changed_source_document=True + print((" Resolution - Changed on source.")) else: - print(("Error: Destination revID '{}' greater than source " + - "revID '{}'. Cannot synchronize unless " + - "--allow-source-changes is enabled.").format( - dest_doc.seqno(), src_doc.seqno())) + print(("Error: Destination revID '{}' greater than " + + "source revID '{}'. Cannot synchronize " + + "unless --allow-source-changes is enabled.") + .format(dest_doc.seqno(), src_doc.seqno()), + file=sys.stderr) return - try: - dest.setWithMeta(key, src_doc.value(), src_doc.exp(), - src_doc.flags(), src_doc.seqno(), - src_doc.cas(), dest_doc.cas()) - except MemcachedError as e: - if e.status == memcacheConstants.ERR_KEY_EEXISTS: - print("Error: Got EEXISTS during setWithMeta(). Possible " + - "CAS mismatch setting at destination.") + # If XDCR isn't enabled and we didn't change the source, try and change dest. + if not options.xdcr_enabled and not changed_source_document: + try: + dest.setWithMeta(key, src_doc.value(), src_doc.exp(), + src_doc.flags(), src_doc.seqno(), + src_doc.cas(), dest_doc.cas()) + + except MemcachedError as e: + if e.status == memcacheConstants.ERR_KEY_EEXISTS: + print("Error: Got EEXISTS during setWithMeta(). " + + "Possible CAS mismatch setting at " + + "destination.", file=sys.stderr) + print((" Resolution - Changed on dest.")) else: # No source document - just delete destination. + print((" Resolution - Deleting from destination.")) dest.delete(key) else: if src_doc: + print_doc("Source", src_doc) # Doesn't exist yet - use addWithMeta. try: dest.addWithMeta(key, src_doc.value(), src_doc.exp(), @@ -162,52 +175,85 @@ def synchronize_key(src, dest, key): except MemcachedError as e: if e.status == memcacheConstants.ERR_KEY_EEXISTS: print(("Error: key '{}' already exists on destination " + - "cluster. Run with --overwrite to overwrite.").format(key)) + "cluster. Run with --overwrite to " + + "overwrite.").format(key), file=sys.stderr) else: raise + print(" Resolution - added to destination.") else: # No source or destination doc - nothing to do. print(("Error: key '{}' doesn't exist on either source or " + - "destination - ignoring.").format(key)) + "destination - ignoring.").format(key), file=sys.stderr) return - # Fetch to double-check it matches: - (dest_err, dest_doc) = get_matching_meta(dest, key, 3) + if options.validate: + # Fetch to double-check it matches: + (dest_err, dest_doc) = get_matching_meta(dest, key, 3) - same = docs_equal(src_err, src_doc, dest_err, dest_doc) - if same: - print(" OK") - else: - print("ERROR: Src & dest differ *after* setWithMeta:") + same = docs_equal(src_err, src_doc, dest_err, dest_doc) + if same: + print(" OK") + else: + if options.xdcr_enabled: + level="WARNING" + else: + level="ERROR" - if not same or options.verbose: - print_doc("Dest after sync", dest_doc) + print(("{}: key '{}' Src & dest differ *after* setWithMeta :") + .format(level, key), file=sys.stderr) + + if not same: + print_doc("Dest after sync", dest_doc) def main(args): - parser = optparse.OptionParser() - parser.add_option('-s','--source-bucket', dest="src_bucket", default="default", + usage = "usage: %prog [options] source-cluster-IP dest-cluster-IP key1 key2 ... keyn" + parser = optparse.OptionParser(usage=usage) + parser.add_option('-s','--source-bucket', dest="src_bucket", + default="default", help="source bucket to use") - parser.add_option('-d','--dest-bucket', dest="dest_bucket", default="default", + + parser.add_option('-d','--dest-bucket', dest="dest_bucket", + default="default", help="destination bucket to use") - parser.add_option('-o', '--overwrite', action='store_true', dest='overwrite', - help='Overwrite destination document if it already exists.') - parser.add_option('-a', '--allow-source-changes', action='store_true', dest='allow_src_changes', + + parser.add_option('-o', '--overwrite', action='store_true', + dest='overwrite', + help='Overwrite destination document if it already ' + + 'exists.') + + parser.add_option('-a', '--allow-source-changes', action='store_true', + dest='allow_src_changes', help=('Allow changes to the source metadata ' + - '(e.g. revID) to be made if necessary to synchronize documents.')) + '(e.g. revID) to be made. Necessary ' + + 'to synchronize documents.')) + parser.add_option('-D', '--delete-if-missing', action='store_true', - dest='delete_if_missing', help='Delete document from destingation if ' + - 'it doesn\'t exist (and no tombstone present) on the source.') + dest='delete_if_missing', help='Delete document from ' + + 'destination if it doesn\'t exist (and no ' + + 'tombstone present) on the source.') + parser.add_option('-v', '--verbose', action='store_true', dest='verbose', help='Verbose') + parser.add_option('-x', '--xdcr_enabled', action='store_true', + dest='xdcr_enabled', + help='Set if XDCR is enabled for the specified ' + + 'bucket (between the clusters).') + + parser.add_option('-V', '--validate', action='store_true', dest='validate', + help='Validate the destination matches the source ' + + 'after a sync. Note that this adds an extra ' + + 'get/get_meta to destination and can race with ' + + 'XDCR for false positives.') + global options options, args = parser.parse_args() password = "" if len(args) < 3: - print("Usage: sync-doc ") + parser.print_help() exit(1) src_port = dest_port = 11211