Skip to content

[CacheResponsesPlugin] Serve out of cache. #442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
3 changes: 2 additions & 1 deletion proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .proxy import entry_point
from .proxy import main, start
from .proxy import Proxy
from .testing.test_case import TestCase
from .testing import TestCase, ReplayTestCase

__all__ = [
# PyPi package entry_point. See
Expand All @@ -23,5 +23,6 @@
# Unit testing with proxy.py. See
# https://github.com/abhinavsingh/proxy.py#unit-testing-with-proxypy
'TestCase',
'ReplayTestCase',
'Proxy',
]
4 changes: 2 additions & 2 deletions proxy/dashboard/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
@abstractmethod
def methods(self) -> List[str]:
"""Return list of methods that this plugin will handle."""
pass
pass # pragma: no cover

def connected(self) -> None:
"""Invoked when client websocket handshake finishes."""
Expand All @@ -43,7 +43,7 @@ def connected(self) -> None:
@abstractmethod
def handle_message(self, message: Dict[str, Any]) -> None:
"""Handle messages for registered methods."""
pass
pass # pragma: no cover

def disconnected(self) -> None:
"""Invoked when client websocket connection gets closed."""
Expand Down
41 changes: 30 additions & 11 deletions proxy/http/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import hashlib
from urllib import parse as urlparse
from typing import TypeVar, NamedTuple, Optional, Dict, Type, Tuple, List

Expand Down Expand Up @@ -45,11 +46,12 @@ def __init__(self, parser_type: int) -> None:
self.type: int = parser_type
self.state: int = httpParserStates.INITIALIZED

# Total size of raw bytes passed for parsing
self.total_size: int = 0

# Buffer to hold unprocessed bytes
self.buffer: bytes = b''
# These properties cleans up developer APIs. Python urlparse.urlsplit behaves
# differently for proxy request and web request. Web request is the one
# which is broken.
self.host: Optional[bytes] = None
self.port: Optional[int] = None
self.path: Optional[bytes] = None

self.headers: Dict[bytes, Tuple[bytes, bytes]] = {}
self.body: Optional[bytes] = None
Expand All @@ -62,12 +64,18 @@ def __init__(self, parser_type: int) -> None:

self.chunk_parser: Optional[ChunkParser] = None

# This cleans up developer APIs as Python urlparse.urlsplit behaves differently
# for incoming proxy request and incoming web request. Web request is the one
# which is broken.
self.host: Optional[bytes] = None
self.port: Optional[int] = None
self.path: Optional[bytes] = None
# Total size of raw bytes passed for parsing
self.total_size: int = 0

# Buffer to hold unprocessed bytes
self.buffer: bytes = b''

# Hash which is updated as request gets parsed
#
# TODO(abhinavsingh): This currently is a requirement
# only when cache plugin is in use, otherwise, this
# will only increase CPU & RAM usage.
self.hash: hashlib._Hash = hashlib.sha512()

@classmethod
def request(cls: Type[T], raw: bytes) -> T:
Expand All @@ -81,6 +89,15 @@ def response(cls: Type[T], raw: bytes) -> T:
parser.parse(raw)
return parser

def fingerprint(self) -> str:
"""Returns a fingerprint unique for the contents in this request.

Ideally must only be used once request has finished processing.
Otherwise, returned fingerprint will be unique for the partial
request being processed.
"""
return self.hash.hexdigest()

def header(self, key: bytes) -> bytes:
if key.lower() not in self.headers:
raise KeyError('%s not found in headers', text_(key))
Expand Down Expand Up @@ -142,6 +159,8 @@ def parse(self, raw: bytes) -> None:
"""Parses Http request out of raw bytes.

Check HttpParser state after parse has successfully returned."""
self.hash.update(raw)

self.total_size += len(raw)
raw = self.buffer + raw
self.buffer = b''
Expand Down
4 changes: 3 additions & 1 deletion proxy/http/proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def read_from_descriptors(self, r: Readables) -> bool:
# See https://github.com/abhinavsingh/proxy.py/issues/127 for why
# currently response parsing is disabled when TLS interception is enabled.
#
# or self.tls_interception_enabled():
# or self.flags.tls_interception_enabled():
if self.response.state == httpParserStates.COMPLETE:
self.handle_pipeline_response(raw)
else:
Expand Down Expand Up @@ -286,6 +286,8 @@ def on_client_data(self, raw: memoryview) -> Optional[memoryview]:
# Previous pipelined request was a WebSocket
# upgrade request. Incoming client data now
# must be treated as WebSocket protocol packets.
#
# TODO(abhinavsingh): Parse websocket frames here.
self.server.queue(raw)
return None

Expand Down
83 changes: 72 additions & 11 deletions proxy/plugin/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@
:license: BSD, see LICENSE for more details.
"""
import logging
from proxy.common.constants import CRLF
from typing import Optional, Any

from .store.base import CacheStore
from ...http.parser import HttpParser
from ...http.proxy import HttpProxyBasePlugin
from .store.base import CacheStore

logger = logging.getLogger(__name__)


class BaseCacheResponsesPlugin(HttpProxyBasePlugin):
"""Base cache plugin.

It requires a storage backend to work with. Storage class
must implement CacheStore interface.

Different storage backends can be used per request if required.
Cache plugin requires a storage backend to work with.
Storage class must implement this interface.
"""

def __init__(
Expand All @@ -34,27 +33,89 @@ def __init__(
super().__init__(*args, **kwargs)
self.store: Optional[CacheStore] = None

# Multiple requests can flow over a single connection.
# Example, CONNECT followed by one or many HTTP requests.
#
# Scheme is stored as attribute as request
# object is not available across all lifecycle
# callbacks.
self.scheme: bytes = b'http'

def set_store(self, store: CacheStore) -> None:
self.store = store

def before_upstream_connection(
self, request: HttpParser) -> Optional[HttpParser]:
"""Avoid connection with upstream server if cached response exists.

Disabled for https request when running without TLS interception.
"""
assert request.url is not None

# Store request scheme
self.scheme = request.url.scheme

# Cache plugin is enabled for HTTPS request only when
# TLS interception is also enabled.
if self.scheme == b'https' and not self.tls_interception_enabled():
return request

# Cache plugin is a no-op for CONNECT requests
# i.e. we don't cache CONNECT responses. However,
# we do skip upstream connection
#
# See https://github.com/abhinavsingh/proxy.py/issues/443
# if request.method == b'CONNECT':
# return None

assert self.store
try:
self.store.open(request)
except Exception as e:
logger.info('Caching disabled due to exception message %s', str(e))
if self.store.is_cached(request):
return None
return request

def handle_client_request(
self, request: HttpParser) -> Optional[HttpParser]:
"""If cached response exists, return response from cache."""
assert request.url is not None

# Cache plugin is enabled for HTTPS request only when
# TLS interception is also enabled.
if request.url.scheme == b'https' and not self.tls_interception_enabled():
return request

assert self.store
return self.store.cache_request(request)
if self.store.is_cached(request):
logger.info("Serving out of cache")
try:
self.store.open(request)
response = self.store.read_response(request)
self.client.queue(memoryview(response.build_response()))
finally:
self.store.close()
return None
# Request not cached, open store for writes
self.store.open(request)
return request

def handle_upstream_chunk(self, chunk: memoryview) -> memoryview:
if self.scheme == b'https' and not self.tls_interception_enabled():
return chunk

assert self.store
return self.store.cache_response_chunk(chunk)
chunk = self.store.cache_response_chunk(chunk)
if chunk.tobytes().endswith(CRLF * 2):
self.store.close()
return chunk

def on_upstream_connection_close(self) -> None:
if self.scheme == b'https' and not self.tls_interception_enabled():
return

assert self.store
self.store.close()

def tls_interception_enabled(self) -> bool:
return self.flags.ca_key_file is not None and \
self.flags.ca_cert_dir is not None and \
self.flags.ca_signing_key_file is not None and \
self.flags.ca_cert_file is not None
14 changes: 10 additions & 4 deletions proxy/plugin/cache/cache_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,27 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import multiprocessing
import os
from typing import Any

from .store.disk import OnDiskCacheStore
from .base import BaseCacheResponsesPlugin


class CacheResponsesPlugin(BaseCacheResponsesPlugin):
"""Caches response using OnDiskCacheStore."""
"""Pluggable caches response plugin.

# Dynamically enable / disable cache
ENABLED = multiprocessing.Event()
Defaults to OnDiskCacheStore.

Different storage backends may be used per request if required.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.disk_store = OnDiskCacheStore(
uid=self.uid, cache_dir=self.flags.cache_dir)
self.set_store(self.disk_store)

def cache_directory(self) -> str:
"""TODO(abhinavsingh): Turn this into a flag."""
return os.path.join(self.flags.proxy_py_data_dir, 'cache')
26 changes: 20 additions & 6 deletions proxy/plugin/cache/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,42 @@
:license: BSD, see LICENSE for more details.
"""
from abc import ABC, abstractmethod
from typing import Optional
from uuid import UUID
from ....http.parser import HttpParser


class CacheStore(ABC):
"""Cache storage backends must implement this interface."""

def __init__(self, uid: UUID) -> None:
self.uid = uid

@abstractmethod
def open(self, request: HttpParser) -> None:
pass
"""Initialize resources to handle this request."""
pass # pragma: no cover

@abstractmethod
def cache_request(self, request: HttpParser) -> Optional[HttpParser]:
return request
def is_cached(self, request: HttpParser) -> bool:
"""Returns whether the request is already cached."""
pass # pragma: no cover

@abstractmethod
def cache_request(self, request: HttpParser) -> HttpParser:
"""Cache the request."""
return request # pragma: no cover

@abstractmethod
def cache_response_chunk(self, chunk: memoryview) -> memoryview:
return chunk
"""Cache response chunks as they arrive."""
return chunk # pragma: no cover

@abstractmethod
def read_response(self, request: HttpParser) -> HttpParser:
"""Reads and return cached response from store."""
pass # pragma: no cover

@abstractmethod
def close(self) -> None:
pass
"""Close any open resources."""
pass # pragma: no cover
34 changes: 24 additions & 10 deletions proxy/plugin/cache/store/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from uuid import UUID

from ....common.flag import flags
from ....common.utils import text_
from ....http.parser import HttpParser

from .base import CacheStore
Expand All @@ -36,24 +35,39 @@ class OnDiskCacheStore(CacheStore):
def __init__(self, uid: UUID, cache_dir: str) -> None:
super().__init__(uid)
self.cache_dir = cache_dir
self.cache_file_path: Optional[str] = None
if not os.path.isdir(self.cache_dir):
os.mkdir(self.cache_dir)
self.cache_file: Optional[BinaryIO] = None

def open(self, request: HttpParser) -> None:
self.cache_file_path = os.path.join(
self.cache_dir,
'%s-%s.txt' % (text_(request.host), self.uid.hex))
self.cache_file = open(self.cache_file_path, "wb")
if not self.cache_file:
cache_file_path = self.get_cache_file_path(request)
logger.info('Opening cache file ' + cache_file_path)
self.cache_file = open(cache_file_path, 'ab+')

def cache_request(self, request: HttpParser) -> Optional[HttpParser]:
def is_cached(self, request: HttpParser) -> bool:
return os.path.isfile(self.get_cache_file_path(request))

def cache_request(self, request: HttpParser) -> HttpParser:
return request

def cache_response_chunk(self, chunk: memoryview) -> memoryview:
if self.cache_file:
self.cache_file.write(chunk.tobytes())
assert self.cache_file is not None
self.cache_file.write(chunk.tobytes())
return chunk

def read_response(self, request: HttpParser) -> HttpParser:
assert self.cache_file is not None
self.cache_file.seek(0)
return HttpParser.response(self.cache_file.read())

def close(self) -> None:
if self.cache_file:
logger.info('Closing cache file')
self.cache_file.flush()
self.cache_file.close()
logger.info('Cached response at %s', self.cache_file_path)
self.cache_file = None

def get_cache_file_path(self, request: HttpParser) -> str:
return os.path.join(self.cache_dir, '.'.join(
[request.fingerprint(), 'cache']))
Loading