-
Notifications
You must be signed in to change notification settings - Fork 35
Logical replication support #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
ca5b546
bb01c7d
782484b
f8b95c6
b1cba73
954879a
0741c70
f4e0bd0
f48623b
138c6cc
f652bf4
bc1002f
08ed6ef
4b279ef
d60cdcb
50e02ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# coding: utf-8 | ||
|
||
from six import raise_from | ||
|
||
from .defaults import default_dbname, default_username | ||
from .exceptions import CatchUpException | ||
from .utils import options_string | ||
|
||
|
||
class Publication(object): | ||
def __init__(self, name, node, tables=None, dbname=None, username=None): | ||
""" | ||
Constructor | ||
|
||
Args: | ||
name: publication name | ||
node: publisher's node | ||
tables: tables list or None for all tables | ||
dbname: database name used to connect and perform subscription | ||
username: username used to connect to the database | ||
""" | ||
self.name = name | ||
self.node = node | ||
self.dbname = dbname or default_dbname() | ||
self.username = username or default_username() | ||
|
||
# create publication in database | ||
t = "table " + ", ".join(tables) if tables else "all tables" | ||
query = "create publication {} for {}" | ||
node.safe_psql(query.format(name, t), dbname=dbname, username=username) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All occurrences of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. However I had to refactor |
||
|
||
def drop(self, dbname=None, username=None): | ||
""" | ||
Drop publication | ||
""" | ||
self.node.safe_psql( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we replace There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I overlooked it |
||
"drop publication {}".format(self.name), | ||
dbname=dbname, | ||
username=username) | ||
|
||
def add_tables(self, tables, dbname=None, username=None): | ||
""" | ||
Add tables | ||
|
||
Args: | ||
tables: a list of tables to add to the publication | ||
""" | ||
if not tables: | ||
raise ValueError("Tables list is empty") | ||
|
||
query = "alter publication {} add table {}" | ||
self.node.safe_psql( | ||
query.format(self.name, ", ".join(tables)), | ||
dbname=dbname or self.dbname, | ||
username=username or self.username) | ||
|
||
|
||
class Subscription(object): | ||
def __init__(self, | ||
name, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to place |
||
node, | ||
publication, | ||
dbname=None, | ||
username=None, | ||
**kwargs): | ||
""" | ||
Constructor | ||
|
||
Args: | ||
name: subscription name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've also noticed that some doc strings end with commas while others don't. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
node: subscriber's node | ||
publication: Publication object we are subscribing to | ||
dbname: database name used to connect and perform subscription | ||
username: username used to connect to the database | ||
**kwargs: subscription parameters (see CREATE SUBSCRIPTION | ||
in PostgreSQL documentation for more information) | ||
""" | ||
self.name = name | ||
self.node = node | ||
self.pub = publication | ||
|
||
# connection info | ||
conninfo = { | ||
"dbname": self.pub.dbname, | ||
"user": self.pub.username, | ||
"host": self.pub.node.host, | ||
"port": self.pub.node.port | ||
} | ||
|
||
query = ( | ||
"create subscription {} connection '{}' publication {}").format( | ||
name, options_string(**conninfo), self.pub.name) | ||
|
||
# additional parameters | ||
if kwargs: | ||
query += " with ({})".format(options_string(**kwargs)) | ||
|
||
node.safe_psql(query, dbname=dbname, username=username) | ||
|
||
def disable(self, dbname=None, username=None): | ||
""" | ||
Disables the running subscription. | ||
""" | ||
query = "alter subscription {} disable" | ||
self.node.safe_psql(query.format(self.name), dbname=None, username=None) | ||
|
||
def enable(self, dbname=None, username=None): | ||
""" | ||
Enables the previously disabled subscription. | ||
""" | ||
query = "alter subscription {} enable" | ||
self.node.safe_psql(query.format(self.name), dbname=None, username=None) | ||
|
||
def refresh(self, copy_data=True, dbname=None, username=None): | ||
""" | ||
Disables the running subscription. | ||
""" | ||
query = "alter subscription {} refresh publication with (copy_data={})" | ||
self.node.safe_psql( | ||
query.format(self.name, copy_data), | ||
dbname=dbname, | ||
username=username) | ||
|
||
def drop(self, dbname=None, username=None): | ||
""" | ||
Drops subscription | ||
""" | ||
self.node.safe_psql( | ||
"drop subscription {}".format(self.name), | ||
dbname=dbname, | ||
username=username) | ||
|
||
def catchup(self, username=None): | ||
""" | ||
Wait until subscription catches up with publication. | ||
|
||
Args: | ||
username: remote node's user name | ||
""" | ||
query = ( | ||
"select pg_current_wal_lsn() - replay_lsn = 0 " | ||
"from pg_stat_replication where application_name = '{}'").format( | ||
self.name) | ||
|
||
try: | ||
# wait until this LSN reaches subscriber | ||
self.pub.node.poll_query_until( | ||
query=query, | ||
dbname=self.pub.dbname, | ||
username=username or self.pub.username, | ||
max_attempts=60) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I don't like the hard-coded number. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be a parameter here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, added |
||
except Exception as e: | ||
raise_from(CatchUpException("Failed to catch up", query), e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we enable this by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it is not supported on postgres versions below 10 and there is specific message when someone's trying to enable this feature on those versions. Besides it produces extra WAL data and hence could work slightly slower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, i see.