Skip to content

Historian update #669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
7 changes: 7 additions & 0 deletions archived/historian/gossipd.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def __eq__(self, other):
)

def __len__(self):
# 5: DNS hostname; data = [1:hostname_len][hostname_len:hostname][2:port] (length up to 258)
if self.typ == 5:
return 1 + len(self.addr) + 2
l = {
1: 6,
2: 18,
Expand Down Expand Up @@ -234,6 +237,10 @@ def parse_address(b):
a.addr = b.read(10)
elif a.typ == 4:
a.addr = b.read(35)
elif a.typ == 5:
dns_len = int.from_bytes(b.read(1), "big")
assert dns_len <= 255
a.addr = b.read(dns_len)
else:
print(f"Unknown address type {a.typ}")
return None
Expand Down
156 changes: 146 additions & 10 deletions archived/historian/historian.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python3
from inotify import constants
from inotify.adapters import Inotify
import os
from pyln.client import Plugin
import pika
from sqlalchemy import create_engine
from sqlalchemy import desc
from sqlalchemy.orm import sessionmaker
Expand Down Expand Up @@ -61,7 +63,8 @@ def resume(self):

# deleted = (length & 0x80000000 != 0)
# important = (length & 0x40000000 != 0)
length = length & (~0x80000000) & (~0x40000000)
# dying = (length & 0x08000000 != 0)
length = length & (~0x80000000) & (~0x40000000) & (~0x08000000)

msg = f.read(length)

Expand All @@ -86,10 +89,14 @@ def resume(self):
f.seek(self.pos)
continue

if typ in [4102, 4103, 4104, 4105, 4106]:
f.seek(self.pos)
continue

if length > MAX_MSG_SIZE:
logging.warn(
f"Unreasonably large message type {typ} at position {self.pos} ({length} bytes), skipping"
)
plugin.log(
f"Unreasonably large message type {typ} at position {self.pos} ({length} bytes), skipping",
level="warn")
continue

ev_count += 1
Expand Down Expand Up @@ -134,28 +141,117 @@ def tail(self):
continue


def encode_varint(value):
"""Encode a varint value"""
result = bytearray()
while value >= 128:
result.append((value & 0x7F) | 0x80)
value >>= 7
result.append(value)
return bytes(result)


def field_prefix(index: int, wire_type: int) -> bytes:
"""The T part of the TLV for protobuf encoded fields.
Bits 0-2 are the type, while greater bits are the varint encoded field index.
0 VARINT int32, int64, uint32, uint64, sint32, sint64, bool, enum
1 I64 fixed64, sfixed64, double
2 LEN string, bytes, embedded messages, packed repeated fields
3 SGROUP group start (deprecated)
4 EGROUP group end (deprecated)
5 I32 fixed32, sfixed32, float"""
return encode_varint(index << 3 | wire_type)


def length_delimited(data: bytes) -> bytes:
"""The LV part of the TLV for protobuf encoded fields."""
if not data:
return b'\x00'
return encode_varint(len(data)) + data


def serialize(msg: bytes, node_id: str, network: str) -> bytes:
# from GL proto/internal.proto:
# message GossipMessage {
# // The raw message as seen on the wire.
# bytes raw = 1;
#
# // For private messages such as local addition of a channel we
# // want to restrict to the node that originated the message.
# bytes node_id = 2;
#
# // Which network was the client configured to follow?
# Network network = 3;
#
# // Which peer of the node sent this message?
# bytes peer_id = 4;
# }
network_encoding = {"bitcoin": 0, "testnet": 1, "regtest": 2, "signet": 3}
if network in network_encoding:
active_network = network_encoding[network]
else:
active_network = 2
output = bytearray()
output.extend(field_prefix(1, 2)) # raw message tag
output.extend(length_delimited(msg)) # raw msg field
output.extend(field_prefix(2, 2)) # node_id tag
output.extend(length_delimited(None)) # leave this empty - all public.
output.extend(field_prefix(3, 0)) # network in an enum
output.extend(length_delimited(active_network.to_bytes())) # network field
output.extend(field_prefix(4, 2)) # peer_id tag
if node_id:
# Add our node_id if we have it (so we know who to blame.)
output.extend(length_delimited(node_id.encode("utf-8")))
else:
output.extend(length_delimited(None)) # our node id not available

return output


class Flusher(Thread):
def __init__(self, engine):
Thread.__init__(self)
self.engine = engine
self.session_maker = sessionmaker(bind=engine)
self.session = None
self.RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
self.connection = None
my_info = plugin.rpc.getinfo()
if "id" in my_info:
self.node_id = my_info["id"]
else:
self.node_id = None
if "network" in my_info:
self.network = my_info["network"]
else:
self.network = None

def rabbitmq_connect(self):
params = pika.URLParameters(self.RABBITMQ_URL)
self.connection = pika.BlockingConnection(params) # default, localhost
self.channel = self.connection.channel()
plugin.log(f"message queue connected to {params.host}:{params.port}")

def run(self):
logging.info("Starting flusher")
ft = FileTailer('gossip_store')
last_flush = time.time()
total = 0

self.session = self.session_maker()
for i, e in enumerate(ft.tail()):
self.store(e)
self.publish(e)

if last_flush < time.time() - 10:
self.session.commit()
self.session = self.session_maker()
last_flush = time.time()

logging.warn("Filetailer exited...")
plugin.log("Filetailer exited...", level="warn")
if self.connection:
self.connection.close()
plugin.log("Rabbitmq connection closed.", level="warn")

def store(self, raw: bytes) -> None:
try:
Expand All @@ -175,16 +271,56 @@ def store(self, raw: bytes) -> None:

self.session.merge(cls.from_gossip(msg, raw))
except Exception as e:
logging.warn(f"Exception parsing gossip message: {e}")
logging.warning(f"Exception parsing gossip message: {e}")

def publish(self, raw: bytes) -> None:
"""Serialize and publish a gossip message to a rabbitmq exchange."""
if not self.RABBITMQ_URL:
return

try:
msg = gossipd.parse(raw)
if msg is None:
return
except Exception as e:
logging.warning(f"Could not parse gossip message: {e}")
return

if not self.connection or not self.connection.is_open:
try:
plugin.log(f"connecting to message queue")
self.rabbitmq_connect()
except:
raise Exception("rabbitmq connection closed")

for msg_type in [gossipd.ChannelUpdate,
gossipd.ChannelAnnouncement,
gossipd.NodeAnnouncement]:
if isinstance(msg, msg_type):
try:
self.channel.basic_publish(exchange='router.gossip',
# unused by fanout exchange
routing_key='',
body=serialize(raw, self.node_id,
self.network))

except pika.exceptions.StreamLostError:
plugin.log("lost connection to rabbitmq, reconnecting")
self.rabbitmq_connect()
return



@plugin.init()
def init(plugin, configuration, options):
print(options)
engine = create_engine(options['historian-dsn'], echo=False)
Base.metadata.create_all(engine)
plugin.engine = engine
Flusher(engine).start()
try:
engine = create_engine(options['historian-dsn'], echo=False)
Base.metadata.create_all(engine)
plugin.engine = engine
Flusher(engine).start()
finally:
engine.dispose()


@plugin.method('historian-stats')
Expand Down
Loading