diff --git a/.travis.yml b/.travis.yml index 1f55e9d1e7..540fffd6d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -57,6 +57,7 @@ install: - pip install grpcio-testing==1.24.3 - pip install 'google-auth >= 1.6.3, < 2' - pip install 'google-auth-oauthlib >= 0.4.1, < 0.5' + - pip install requests==2.21.0 - yarn install --ignore-engines # Uninstall older Travis numpy to avoid upgrade-in-place issues. - pip uninstall -y numpy diff --git a/RELEASE.md b/RELEASE.md index ff521e13a5..0b5548cc56 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,15 @@ +# Release 2.0.2 + +## Features + +- Improvements to [TensorBoard.dev] support: + - New `tensorboard dev list` subcommand lists all experiments uploaded to + TensorBoard.dev (#2903) + - In the event of a transient backend issue or permanent breaking change, the + uploader can now gracefully degrade and print a diagnostic (#2879) + +[TensorBoard.dev]: https://tensorboard.dev/ + # Release 2.0.1 ## Features diff --git a/tensorboard/BUILD b/tensorboard/BUILD index 7d6df761f5..772db0e373 100644 --- a/tensorboard/BUILD +++ b/tensorboard/BUILD @@ -373,6 +373,14 @@ py_library( visibility = ["//visibility:public"], ) +py_library( + name = "expect_requests_installed", + # This is a dummy rule used as a requests dependency in open-source. + # We expect requests to already be installed on the system, e.g., via + # `pip install requests`. + visibility = ["//visibility:public"], +) + filegroup( name = "tf_web_library_default_typings", srcs = [ diff --git a/tensorboard/pip_package/setup.py b/tensorboard/pip_package/setup.py index a33543f872..67bfb8f892 100644 --- a/tensorboard/pip_package/setup.py +++ b/tensorboard/pip_package/setup.py @@ -32,6 +32,7 @@ 'markdown >= 2.6.8', 'numpy >= 1.12.0', 'protobuf >= 3.6.0', + 'requests >= 2.21.0, < 3', 'setuptools >= 41.0.0', 'six >= 1.10.0', 'werkzeug >= 0.11.15', diff --git a/tensorboard/uploader/BUILD b/tensorboard/uploader/BUILD index a0b25720ba..cc8a34c88f 100644 --- a/tensorboard/uploader/BUILD +++ b/tensorboard/uploader/BUILD @@ -20,6 +20,7 @@ py_library( "//tensorboard:expect_grpc_installed", "//tensorboard/uploader/proto:protos_all_py_pb2", "//tensorboard/util:grpc_util", + "@org_pythonhosted_six", ], ) @@ -56,7 +57,9 @@ py_library( ":auth", ":dev_creds", ":exporter_lib", + ":server_info", ":uploader_lib", + ":util", "//tensorboard:expect_absl_app_installed", "//tensorboard:expect_absl_flags_argparse_flags_installed", "//tensorboard:expect_absl_flags_installed", @@ -201,3 +204,29 @@ py_test( "//tensorboard:test", ], ) + +py_library( + name = "server_info", + srcs = ["server_info.py"], + deps = [ + "//tensorboard:expect_requests_installed", + "//tensorboard:version", + "//tensorboard/uploader/proto:protos_all_py_pb2", + "@com_google_protobuf//:protobuf_python", + ], +) + +py_test( + name = "server_info_test", + size = "medium", # local network requests + timeout = "short", + srcs = ["server_info_test.py"], + deps = [ + ":server_info", + "//tensorboard:expect_futures_installed", + "//tensorboard:test", + "//tensorboard:version", + "//tensorboard/uploader/proto:protos_all_py_pb2", + "@org_pocoo_werkzeug", + ], +) diff --git a/tensorboard/uploader/exporter.py b/tensorboard/uploader/exporter.py index e7421f9a24..ef6d6d7205 100644 --- a/tensorboard/uploader/exporter.py +++ b/tensorboard/uploader/exporter.py @@ -26,6 +26,8 @@ import string import time +import six + from tensorboard.uploader.proto import export_service_pb2 from tensorboard.uploader import util from tensorboard.util import grpc_util @@ -126,13 +128,13 @@ def export(self, read_time=None): def _request_experiment_ids(self, read_time): """Yields all of the calling user's experiment IDs, as strings.""" - request = export_service_pb2.StreamExperimentsRequest(limit=_MAX_INT64) - util.set_timestamp(request.read_timestamp, read_time) - stream = self._api.StreamExperiments( - request, metadata=grpc_util.version_metadata()) - for response in stream: - for experiment_id in response.experiment_ids: - yield experiment_id + for experiment in list_experiments(self._api, read_time=read_time): + if isinstance(experiment, export_service_pb2.Experiment): + yield experiment.experiment_id + elif isinstance(experiment, six.string_types): + yield experiment + else: + raise AssertionError("Unexpected experiment type: %r" % (experiment,)) def _request_scalar_data(self, experiment_id, read_time): """Yields JSON-serializable blocks of scalar data.""" @@ -163,6 +165,38 @@ def _request_scalar_data(self, experiment_id, read_time): } +def list_experiments(api_client, fieldmask=None, read_time=None): + """Yields all of the calling user's experiments. + + Args: + api_client: A TensorBoardExporterService stub instance. + fieldmask: An optional `export_service_pb2.ExperimentMask` value. + read_time: A fixed timestamp from which to export data, as float seconds + since epoch (like `time.time()`). Optional; defaults to the current + time. + + Yields: + For each experiment owned by the user, an `export_service_pb2.Experiment` + value, or a simple string experiment ID for older servers. + """ + if read_time is None: + read_time = time.time() + request = export_service_pb2.StreamExperimentsRequest(limit=_MAX_INT64) + util.set_timestamp(request.read_timestamp, read_time) + if fieldmask: + request.experiments_mask.CopyFrom(fieldmask) + stream = api_client.StreamExperiments( + request, metadata=grpc_util.version_metadata()) + for response in stream: + if response.experiments: + for experiment in response.experiments: + yield experiment + else: + # Old servers. + for experiment_id in response.experiment_ids: + yield experiment_id + + class OutputDirectoryExistsError(ValueError): pass diff --git a/tensorboard/uploader/exporter_test.py b/tensorboard/uploader/exporter_test.py index f4e5213a86..aef6947ee3 100644 --- a/tensorboard/uploader/exporter_test.py +++ b/tensorboard/uploader/exporter_test.py @@ -45,15 +45,7 @@ class TensorBoardExporterTest(tb_test.TestCase): def _create_mock_api_client(self): - # Create a stub instance (using a test channel) in order to derive a mock - # from it with autospec enabled. Mocking TensorBoardExporterServiceStub - # itself doesn't work with autospec because grpc constructs stubs via - # metaclassing. - test_channel = grpc_testing.channel( - service_descriptors=[], time=grpc_testing.strict_real_time()) - stub = export_service_pb2_grpc.TensorBoardExporterServiceStub(test_channel) - mock_api_client = mock.create_autospec(stub) - return mock_api_client + return _create_mock_api_client() def _make_experiments_response(self, eids): return export_service_pb2.StreamExperimentsResponse(experiment_ids=eids) @@ -323,6 +315,62 @@ def test_propagates_mkdir_errors(self): mock_api_client.StreamExperimentData.assert_not_called() +class ListExperimentsTest(tb_test.TestCase): + + def test_experiment_ids_only(self): + mock_api_client = _create_mock_api_client() + + def stream_experiments(request, **kwargs): + del request # unused + yield export_service_pb2.StreamExperimentsResponse( + experiment_ids=["123", "456"]) + yield export_service_pb2.StreamExperimentsResponse( + experiment_ids=["789"]) + + mock_api_client.StreamExperiments = mock.Mock(wraps=stream_experiments) + gen = exporter_lib.list_experiments(mock_api_client) + mock_api_client.StreamExperiments.assert_not_called() + self.assertEqual(list(gen), ["123", "456", "789"]) + + def test_mixed_experiments_and_ids(self): + mock_api_client = _create_mock_api_client() + + def stream_experiments(request, **kwargs): + del request # unused + + # Should include `experiment_ids` when no `experiments` given. + response = export_service_pb2.StreamExperimentsResponse() + response.experiment_ids.append("123") + response.experiment_ids.append("456") + yield response + + # Should ignore `experiment_ids` in the presence of `experiments`. + response = export_service_pb2.StreamExperimentsResponse() + response.experiment_ids.append("999") # will be omitted + response.experiments.add(experiment_id="789") + response.experiments.add(experiment_id="012") + yield response + + # Should include `experiments` even when no `experiment_ids` are given. + response = export_service_pb2.StreamExperimentsResponse() + response.experiments.add(experiment_id="345") + response.experiments.add(experiment_id="678") + yield response + + mock_api_client.StreamExperiments = mock.Mock(wraps=stream_experiments) + gen = exporter_lib.list_experiments(mock_api_client) + mock_api_client.StreamExperiments.assert_not_called() + expected = [ + "123", + "456", + export_service_pb2.Experiment(experiment_id="789"), + export_service_pb2.Experiment(experiment_id="012"), + export_service_pb2.Experiment(experiment_id="345"), + export_service_pb2.Experiment(experiment_id="678"), + ] + self.assertEqual(list(gen), expected) + + class MkdirPTest(tb_test.TestCase): def test_makes_full_chain(self): @@ -384,5 +432,17 @@ def test_propagates_other_errors(self): self.assertEqual(cm.exception.errno, errno.ENOENT) +def _create_mock_api_client(): + # Create a stub instance (using a test channel) in order to derive a mock + # from it with autospec enabled. Mocking TensorBoardExporterServiceStub + # itself doesn't work with autospec because grpc constructs stubs via + # metaclassing. + test_channel = grpc_testing.channel( + service_descriptors=[], time=grpc_testing.strict_real_time()) + stub = export_service_pb2_grpc.TensorBoardExporterServiceStub(test_channel) + mock_api_client = mock.create_autospec(stub) + return mock_api_client + + if __name__ == "__main__": tb_test.main() diff --git a/tensorboard/uploader/proto/BUILD b/tensorboard/uploader/proto/BUILD index 9353b897e6..bb702af388 100644 --- a/tensorboard/uploader/proto/BUILD +++ b/tensorboard/uploader/proto/BUILD @@ -6,11 +6,13 @@ licenses(["notice"]) # Apache 2.0 exports_files(["LICENSE"]) +# TODO(@wchargin): Split more granularly. tb_proto_library( name = "protos_all", srcs = [ "export_service.proto", "scalar.proto", + "server_info.proto", "write_service.proto", ], has_services = True, diff --git a/tensorboard/uploader/proto/export_service.proto b/tensorboard/uploader/proto/export_service.proto index 54ad5ab751..a8dba432dc 100644 --- a/tensorboard/uploader/proto/export_service.proto +++ b/tensorboard/uploader/proto/export_service.proto @@ -27,19 +27,80 @@ message StreamExperimentsRequest { string user_id = 2; // Limits the number of experiment IDs returned. This is useful to check if // user might have any data by setting limit=1. Also useful to preview the - // list of experiments. + // list of experiments. TODO(@karthikv2k): Support pagination. int64 limit = 3; - // TODO(@karthikv2k): Support pagination. + // Field mask for what experiment data to return via the `experiments` field + // on the response. If not specified, this should be interpreted the same as + // an empty message: i.e., only the experiment ID should be returned. + ExperimentMask experiments_mask = 4; } -// Streams experiment IDs returned from TensorBoard.dev. +// Streams experiment metadata (ID, creation time, etc.) from TensorBoard.dev. message StreamExperimentsResponse { - // List of experiment IDs for the experiments owned by the user. The entire - // list of experiments owned by the user is streamed in batches and each batch - // contains a list of experiment IDs. A consumer of this stream needs to - // concatenate all these lists to get the full response. The order of - // experiment IDs in the stream is not defined. + // Deprecated in favor of `experiments`. If a response has `experiments` set, + // clients should ignore `experiment_ids` entirely. Otherwise, clients should + // treat `experiment_ids` as a list of `experiments` for which only the + // `experiment_id` field is set, with the understanding that the other fields + // were not populated regardless of the requested field mask. + // + // For example, the following responses should be treated the same: + // + // # Response 1 + // experiment_ids: "123" + // experiment_ids: "456" + // + // # Response 2 + // experiments { experiment_id: "123" } + // experiments { experiment_id: "456" } + // + // # Response 3 + // experiment_ids: "789" + // experiments { experiment_id: "123" } + // experiments { experiment_id: "456" } + // + // See documentation on `experiments` for batching semantics. repeated string experiment_ids = 1; + // List of experiments owned by the user. The entire list of experiments + // owned by the user is streamed in batches and each batch contains a list of + // experiments. A consumer of this stream needs to concatenate all these + // lists to get the full response. The order of experiments in the stream is + // not defined. Every response will contain at least one experiment. + // + // These messages may be partially populated, in accordance with the field + // mask given in the request. + repeated Experiment experiments = 2; +} + +// Metadata about an experiment. +message Experiment { + // Permanent ID of this experiment; e.g.: "AdYd1TgeTlaLWXx6I8JUbA". + string experiment_id = 1; + // The time that the experiment was created. + google.protobuf.Timestamp create_time = 2; + // The time that the experiment was last modified: i.e., the most recent time + // that scalars were added to the experiment. + google.protobuf.Timestamp update_time = 3; + // The number of scalars in this experiment, across all time series. + int64 num_scalars = 4; + // The number of distinct run names in this experiment. + int64 num_runs = 5; + // The number of distinct tag names in this experiment. A tag name that + // appears in multiple runs will be counted only once. + int64 num_tags = 6; +} + +// Field mask for `Experiment`. The `experiment_id` field is always implicitly +// considered to be requested. Other fields of `Experiment` will be populated +// if their corresponding bits in the `ExperimentMask` are set. The server may +// choose to populate fields that are not explicitly requested. +message ExperimentMask { + reserved 1; + reserved "experiment_id"; + bool create_time = 2; + bool update_time = 3; + bool num_scalars = 4; + bool num_runs = 5; + bool num_tags = 6; } // Request to stream scalars from all the runs and tags in an experiment. diff --git a/tensorboard/uploader/proto/server_info.proto b/tensorboard/uploader/proto/server_info.proto new file mode 100644 index 0000000000..ba2a592f24 --- /dev/null +++ b/tensorboard/uploader/proto/server_info.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +package tensorboard.service; + +// Request sent by uploader clients at the start of an upload session. Used to +// determine whether the client is recent enough to communicate with the +// server, and to receive any metadata needed for the upload session. +message ServerInfoRequest { + // Client-side TensorBoard version, per `tensorboard.version.VERSION`. + string version = 1; +} + +message ServerInfoResponse { + // Primary bottom-line: is the server compatible with the client, and is + // there anything that the end user should be aware of? + Compatibility compatibility = 1; + // Identifier for a gRPC server providing the `TensorBoardExporterService` and + // `TensorBoardWriterService` services (under the `tensorboard.service` proto + // package). + ApiServer api_server = 2; + // How to generate URLs to experiment pages. + ExperimentUrlFormat url_format = 3; +} + +enum CompatibilityVerdict { + VERDICT_UNKNOWN = 0; + // All is well. The client may proceed. + VERDICT_OK = 1; + // The client may proceed, but should heed the accompanying message. This + // may be the case if the user is on a version of TensorBoard that will + // soon be unsupported, or if the server is experiencing transient issues. + VERDICT_WARN = 2; + // The client should cease further communication with the server and abort + // operation after printing the accompanying `details` message. + VERDICT_ERROR = 3; +} + +message Compatibility { + CompatibilityVerdict verdict = 1; + // Human-readable message to display. When non-empty, will be displayed in + // all cases, even when the client may proceed. + string details = 2; +} + +message ApiServer { + // gRPC server URI: . + // For example: "api.tensorboard.dev:443". + string endpoint = 1; +} + +message ExperimentUrlFormat { + // Template string for experiment URLs. All occurrences of the value of the + // `id_placeholder` field in this template string should be replaced with an + // experiment ID. For example, if `id_placeholder` is "{{EID}}", then + // `template` might be "https://tensorboard.dev/experiment/{{EID}}/". + // Should be absolute. + string template = 1; + // Placeholder string that should be replaced with an actual experiment ID. + // (See docs for `template` field.) + string id_placeholder = 2; +} diff --git a/tensorboard/uploader/server_info.py b/tensorboard/uploader/server_info.py new file mode 100644 index 0000000000..5906bb11b3 --- /dev/null +++ b/tensorboard/uploader/server_info.py @@ -0,0 +1,117 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Initial server communication to determine session parameters.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from google.protobuf import message +import requests + +from tensorboard import version +from tensorboard.uploader.proto import server_info_pb2 + + +# Request timeout for communicating with remote server. +_REQUEST_TIMEOUT_SECONDS = 10 + + +def _server_info_request(): + request = server_info_pb2.ServerInfoRequest() + request.version = version.VERSION + return request + + +def fetch_server_info(origin): + """Fetches server info from a remote server. + + Args: + origin: The server with which to communicate. Should be a string + like "https://tensorboard.dev", including protocol, host, and (if + needed) port. + + Returns: + A `server_info_pb2.ServerInfoResponse` message. + + Raises: + CommunicationError: Upon failure to connect to or successfully + communicate with the remote server. + """ + endpoint = "%s/api/uploader" % origin + post_body = _server_info_request().SerializeToString() + try: + response = requests.post( + endpoint, + data=post_body, + timeout=_REQUEST_TIMEOUT_SECONDS, + headers={"User-Agent": "tensorboard/%s" % version.VERSION}, + ) + except requests.RequestException as e: + raise CommunicationError("Failed to connect to backend: %s" % e) + if not response.ok: + raise CommunicationError( + "Non-OK status from backend (%d %s): %r" + % (response.status_code, response.reason, response.content) + ) + try: + return server_info_pb2.ServerInfoResponse.FromString(response.content) + except message.DecodeError as e: + raise CommunicationError( + "Corrupt response from backend (%s): %r" % (e, response.content) + ) + + +def create_server_info(frontend_origin, api_endpoint): + """Manually creates server info given a frontend and backend. + + Args: + frontend_origin: The origin of the TensorBoard.dev frontend, like + "https://tensorboard.dev" or "http://localhost:8000". + api_endpoint: As to `server_info_pb2.ApiServer.endpoint`. + + Returns: + A `server_info_pb2.ServerInfoResponse` message. + """ + result = server_info_pb2.ServerInfoResponse() + result.compatibility.verdict = server_info_pb2.VERDICT_OK + result.api_server.endpoint = api_endpoint + url_format = result.url_format + placeholder = "{{EID}}" + while placeholder in frontend_origin: + placeholder = "{%s}" % placeholder + url_format.template = "%s/experiment/%s/" % (frontend_origin, placeholder) + url_format.id_placeholder = placeholder + return result + + +def experiment_url(server_info, experiment_id): + """Formats a URL that will resolve to the provided experiment. + + Args: + server_info: A `server_info_pb2.ServerInfoResponse` message. + experiment_id: A string; the ID of the experiment to link to. + + Returns: + A URL resolving to the given experiment, as a string. + """ + url_format = server_info.url_format + return url_format.template.replace(url_format.id_placeholder, experiment_id) + + +class CommunicationError(RuntimeError): + """Raised upon failure to communicate with the server.""" + + pass diff --git a/tensorboard/uploader/server_info_test.py b/tensorboard/uploader/server_info_test.py new file mode 100644 index 0000000000..64a8bdfb00 --- /dev/null +++ b/tensorboard/uploader/server_info_test.py @@ -0,0 +1,179 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for tensorboard.uploader.server_info.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import errno +import os +import socket +from wsgiref import simple_server + +from concurrent import futures +from werkzeug import wrappers + +from tensorboard import test as tb_test +from tensorboard import version +from tensorboard.uploader import server_info +from tensorboard.uploader.proto import server_info_pb2 + + +class FetchServerInfoTest(tb_test.TestCase): + """Tests for `fetch_server_info`.""" + + def _start_server(self, app): + """Starts a server and returns its origin ("http://localhost:PORT").""" + (_, localhost) = _localhost() + server_class = _make_ipv6_compatible_wsgi_server() + server = simple_server.make_server(localhost, 0, app, server_class) + executor = futures.ThreadPoolExecutor() + future = executor.submit(server.serve_forever, poll_interval=0.01) + + def cleanup(): + server.shutdown() # stop handling requests + server.server_close() # release port + future.result(timeout=3) # wait for server termination + + self.addCleanup(cleanup) + return "http://localhost:%d" % server.server_port + + def test_fetches_response(self): + expected_result = server_info_pb2.ServerInfoResponse() + expected_result.compatibility.verdict = server_info_pb2.VERDICT_OK + expected_result.compatibility.details = "all clear" + expected_result.api_server.endpoint = "api.example.com:443" + expected_result.url_format.template = "http://localhost:8080/{{eid}}" + expected_result.url_format.id_placeholder = "{{eid}}" + + @wrappers.BaseRequest.application + def app(request): + self.assertEqual(request.method, "POST") + self.assertEqual(request.path, "/api/uploader") + body = request.get_data() + request_pb = server_info_pb2.ServerInfoRequest.FromString(body) + self.assertEqual(request_pb.version, version.VERSION) + return wrappers.BaseResponse(expected_result.SerializeToString()) + + origin = self._start_server(app) + result = server_info.fetch_server_info(origin) + self.assertEqual(result, expected_result) + + def test_econnrefused(self): + (family, localhost) = _localhost() + s = socket.socket(family) + s.bind((localhost, 0)) + self.addCleanup(s.close) + port = s.getsockname()[1] + with self.assertRaises(server_info.CommunicationError) as cm: + server_info.fetch_server_info("http://localhost:%d" % port) + msg = str(cm.exception) + self.assertIn("Failed to connect to backend", msg) + if os.name != "nt": + self.assertIn(os.strerror(errno.ECONNREFUSED), msg) + + def test_non_ok_response(self): + @wrappers.BaseRequest.application + def app(request): + del request # unused + return wrappers.BaseResponse(b"very sad", status="502 Bad Gateway") + + origin = self._start_server(app) + with self.assertRaises(server_info.CommunicationError) as cm: + server_info.fetch_server_info(origin) + msg = str(cm.exception) + self.assertIn("Non-OK status from backend (502 Bad Gateway)", msg) + self.assertIn("very sad", msg) + + def test_corrupt_response(self): + @wrappers.BaseRequest.application + def app(request): + del request # unused + return wrappers.BaseResponse(b"an unlikely proto") + + origin = self._start_server(app) + with self.assertRaises(server_info.CommunicationError) as cm: + server_info.fetch_server_info(origin) + msg = str(cm.exception) + self.assertIn("Corrupt response from backend", msg) + self.assertIn("an unlikely proto", msg) + + def test_user_agent(self): + @wrappers.BaseRequest.application + def app(request): + result = server_info_pb2.ServerInfoResponse() + result.compatibility.details = request.headers["User-Agent"] + return wrappers.BaseResponse(result.SerializeToString()) + + origin = self._start_server(app) + result = server_info.fetch_server_info(origin) + expected_user_agent = "tensorboard/%s" % version.VERSION + self.assertEqual(result.compatibility.details, expected_user_agent) + + +class CreateServerInfoTest(tb_test.TestCase): + """Tests for `create_server_info`.""" + + def test(self): + frontend = "http://localhost:8080" + backend = "localhost:10000" + result = server_info.create_server_info(frontend, backend) + + expected_compatibility = server_info_pb2.Compatibility() + expected_compatibility.verdict = server_info_pb2.VERDICT_OK + expected_compatibility.details = "" + self.assertEqual(result.compatibility, expected_compatibility) + + expected_api_server = server_info_pb2.ApiServer() + expected_api_server.endpoint = backend + self.assertEqual(result.api_server, expected_api_server) + + url_format = result.url_format + actual_url = url_format.template.replace(url_format.id_placeholder, "123") + expected_url = "http://localhost:8080/experiment/123/" + self.assertEqual(actual_url, expected_url) + + +class ExperimentUrlTest(tb_test.TestCase): + """Tests for `experiment_url`.""" + + def test(self): + info = server_info_pb2.ServerInfoResponse() + info.url_format.template = "https://unittest.tensorboard.dev/x/???" + info.url_format.id_placeholder = "???" + actual = server_info.experiment_url(info, "123") + self.assertEqual(actual, "https://unittest.tensorboard.dev/x/123") + + +def _localhost(): + """Gets family and nodename for a loopback address.""" + s = socket + infos = s.getaddrinfo(None, 0, s.AF_UNSPEC, s.SOCK_STREAM, 0, s.AI_ADDRCONFIG) + (family, _, _, _, address) = infos[0] + nodename = address[0] + return (family, nodename) + + +def _make_ipv6_compatible_wsgi_server(): + """Creates a `WSGIServer` subclass that works on IPv6-only machines.""" + address_family = _localhost()[0] + attrs = {"address_family": address_family} + bases = (simple_server.WSGIServer, object) # `object` needed for py2 + return type("_Ipv6CompatibleWsgiServer", bases, attrs) + + +if __name__ == "__main__": + tb_test.main() diff --git a/tensorboard/uploader/uploader.py b/tensorboard/uploader/uploader.py index 222a23977c..5cb780b1f5 100644 --- a/tensorboard/uploader/uploader.py +++ b/tensorboard/uploader/uploader.py @@ -89,12 +89,12 @@ def __init__(self, writer_client, logdir, rate_limiter=None): self._logdir, directory_loader_factory) def create_experiment(self): - """Creates an Experiment for this upload session and returns the URL.""" + """Creates an Experiment for this upload session and returns the ID.""" logger.info("Creating experiment") request = write_service_pb2.CreateExperimentRequest() response = grpc_util.call_with_retries(self._api.CreateExperiment, request) self._request_builder = _RequestBuilder(response.experiment_id) - return response.url + return response.experiment_id def start_uploading(self): """Blocks forever to continuously upload data from the logdir. diff --git a/tensorboard/uploader/uploader_main.py b/tensorboard/uploader/uploader_main.py index d9fc953847..87ac1b6276 100644 --- a/tensorboard/uploader/uploader_main.py +++ b/tensorboard/uploader/uploader_main.py @@ -31,11 +31,15 @@ import six from tensorboard.uploader import dev_creds +from tensorboard.uploader.proto import export_service_pb2 from tensorboard.uploader.proto import export_service_pb2_grpc from tensorboard.uploader.proto import write_service_pb2_grpc from tensorboard.uploader import auth from tensorboard.uploader import exporter as exporter_lib +from tensorboard.uploader import server_info as server_info_lib from tensorboard.uploader import uploader as uploader_lib +from tensorboard.uploader import util +from tensorboard.uploader.proto import server_info_pb2 from tensorboard import program from tensorboard.plugins import base_plugin @@ -59,11 +63,14 @@ _SUBCOMMAND_FLAG = '_uploader__subcommand' _SUBCOMMAND_KEY_UPLOAD = 'UPLOAD' _SUBCOMMAND_KEY_DELETE = 'DELETE' +_SUBCOMMAND_KEY_LIST = 'LIST' _SUBCOMMAND_KEY_EXPORT = 'EXPORT' _SUBCOMMAND_KEY_AUTH = 'AUTH' _AUTH_SUBCOMMAND_FLAG = '_uploader__subcommand_auth' _AUTH_SUBCOMMAND_KEY_REVOKE = 'REVOKE' +_DEFAULT_ORIGIN = "https://tensorboard.dev" + def _prompt_for_user_ack(intent): """Prompts for user consent, exiting the program if they decline.""" @@ -90,10 +97,19 @@ def _define_flags(parser): subparsers = parser.add_subparsers() parser.add_argument( - '--endpoint', + '--origin', + type=str, + default='', + help='Experimental. Origin for TensorBoard.dev service to which ' + 'to connect. If not set, defaults to %r.' % _DEFAULT_ORIGIN) + + parser.add_argument( + '--api_endpoint', type=str, - default='api.tensorboard.dev:443', - help='URL for the API server accepting write requests.') + default='', + help='Experimental. Direct URL for the API server accepting ' + 'write requests. If set, will skip initial server handshake ' + 'unless `--origin` is also set.') parser.add_argument( '--grpc_creds_type', @@ -135,6 +151,10 @@ def _define_flags(parser): default=None, help='ID of an experiment to delete permanently') + list_parser = subparsers.add_parser( + 'list', help='list previously uploaded experiments') + list_parser.set_defaults(**{_SUBCOMMAND_FLAG: _SUBCOMMAND_KEY_LIST}) + export = subparsers.add_parser( 'export', help='download all your experiment data') export.set_defaults(**{_SUBCOMMAND_FLAG: _SUBCOMMAND_KEY_EXPORT}) @@ -217,15 +237,26 @@ def _run(flags): msg = 'Invalid --grpc_creds_type %s' % flags.grpc_creds_type raise base_plugin.FlagsError(msg) + try: + server_info = _get_server_info(flags) + except server_info_lib.CommunicationError as e: + _die(str(e)) + _handle_server_info(server_info) + + if not server_info.api_server.endpoint: + logging.error('Server info response: %s', server_info) + _die('Internal error: frontend did not specify an API server') composite_channel_creds = grpc.composite_channel_credentials( channel_creds, auth.id_token_call_credentials(credentials)) # TODO(@nfelt): In the `_UploadIntent` case, consider waiting until # logdir exists to open channel. channel = grpc.secure_channel( - flags.endpoint, composite_channel_creds, options=channel_options) + server_info.api_server.endpoint, + composite_channel_creds, + options=channel_options) with channel: - intent.execute(channel) + intent.execute(server_info, channel) @six.add_metaclass(abc.ABCMeta) @@ -249,10 +280,11 @@ def get_ack_message_body(self): pass @abc.abstractmethod - def execute(self, channel): + def execute(self, server_info, channel): """Carries out this intent with the specified gRPC channel. Args: + server_info: A `server_info_pb2.ServerInfoResponse` value. channel: A connected gRPC channel whose server provides the TensorBoard reader and writer services. """ @@ -266,7 +298,7 @@ def get_ack_message_body(self): """Must not be called.""" raise AssertionError('No user ack needed to revoke credentials') - def execute(self, channel): + def execute(self, server_info, channel): """Execute handled specially by `main`. Must not be called.""" raise AssertionError('_AuthRevokeIntent should not be directly executed') @@ -291,7 +323,7 @@ def __init__(self, experiment_id): def get_ack_message_body(self): return self._MESSAGE_TEMPLATE.format(experiment_id=self.experiment_id) - def execute(self, channel): + def execute(self, server_info, channel): api_client = write_service_pb2_grpc.TensorBoardWriterServiceStub(channel) experiment_id = self.experiment_id if not experiment_id: @@ -312,6 +344,57 @@ def execute(self, channel): print('Deleted experiment %s.' % experiment_id) +class _ListIntent(_Intent): + """The user intends to list all their experiments.""" + + _MESSAGE = textwrap.dedent(u"""\ + This will list all experiments that you've uploaded to + https://tensorboard.dev. TensorBoard.dev experiments are visible + to everyone. Do not upload sensitive data. + """) + + def get_ack_message_body(self): + return self._MESSAGE + + def execute(self, server_info, channel): + api_client = export_service_pb2_grpc.TensorBoardExporterServiceStub(channel) + fieldmask = export_service_pb2.ExperimentMask( + create_time=True, + update_time=True, + num_scalars=True, + num_runs=True, + num_tags=True, + ) + gen = exporter_lib.list_experiments(api_client, fieldmask=fieldmask) + count = 0 + for experiment in gen: + count += 1 + if not isinstance(experiment, export_service_pb2.Experiment): + url = server_info_lib.experiment_url(server_info, experiment) + print(url) + continue + experiment_id = experiment.experiment_id + url = server_info_lib.experiment_url(server_info, experiment_id) + print(url) + data = [ + ('Id', experiment.experiment_id), + ('Created', util.format_time(experiment.create_time)), + ('Updated', util.format_time(experiment.update_time)), + ('Scalars', str(experiment.num_scalars)), + ('Runs', str(experiment.num_runs)), + ('Tags', str(experiment.num_tags)), + ] + for (name, value) in data: + print('\t%s %s' % (name.ljust(10), value)) + sys.stdout.flush() + if not count: + sys.stderr.write( + 'No experiments. Use `tensorboard dev upload` to get started.\n') + else: + sys.stderr.write('Total: %d experiment(s)\n' % count) + sys.stderr.flush() + + class _UploadIntent(_Intent): """The user intends to upload an experiment from the given logdir.""" @@ -331,10 +414,11 @@ def __init__(self, logdir): def get_ack_message_body(self): return self._MESSAGE_TEMPLATE.format(logdir=self.logdir) - def execute(self, channel): + def execute(self, server_info, channel): api_client = write_service_pb2_grpc.TensorBoardWriterServiceStub(channel) uploader = uploader_lib.TensorBoardUploader(api_client, self.logdir) - url = uploader.create_experiment() + experiment_id = uploader.create_experiment() + url = server_info_lib.experiment_url(server_info, experiment_id) print("Upload started and will continue reading any new data as it's added") print("to the logdir. To stop uploading, press Ctrl-C.") print("View your TensorBoard live at: %s" % url) @@ -372,7 +456,7 @@ def __init__(self, output_dir): def get_ack_message_body(self): return self._MESSAGE_TEMPLATE.format(output_dir=self.output_dir) - def execute(self, channel): + def execute(self, server_info, channel): api_client = export_service_pb2_grpc.TensorBoardExporterServiceStub(channel) outdir = self.output_dir try: @@ -421,6 +505,8 @@ def _get_intent(flags): else: raise base_plugin.FlagsError( 'Must specify experiment to delete via `--experiment_id`.') + elif cmd == _SUBCOMMAND_KEY_LIST: + return _ListIntent() elif cmd == _SUBCOMMAND_KEY_EXPORT: if flags.outdir: return _ExportIntent(flags.outdir) @@ -439,6 +525,32 @@ def _get_intent(flags): raise AssertionError('Unknown subcommand %r' % (cmd,)) +def _get_server_info(flags): + origin = flags.origin or _DEFAULT_ORIGIN + if flags.api_endpoint and not flags.origin: + return server_info_lib.create_server_info(origin, flags.api_endpoint) + server_info = server_info_lib.fetch_server_info(origin) + # Override with any API server explicitly specified on the command + # line, but only if the server accepted our initial handshake. + if flags.api_endpoint and server_info.api_server.endpoint: + server_info.api_server.endpoint = flags.api_endpoint + return server_info + + +def _handle_server_info(info): + compat = info.compatibility + if compat.verdict == server_info_pb2.VERDICT_WARN: + sys.stderr.write('Warning [from server]: %s\n' % compat.details) + sys.stderr.flush() + elif compat.verdict == server_info_pb2.VERDICT_ERROR: + _die('Error [from server]: %s' % compat.details) + else: + # OK or unknown; assume OK. + if compat.details: + sys.stderr.write('%s\n' % compat.details) + sys.stderr.flush() + + def _die(message): sys.stderr.write('%s\n' % (message,)) sys.stderr.flush() diff --git a/tensorboard/uploader/uploader_test.py b/tensorboard/uploader/uploader_test.py index 427ce45ac8..f1abefb1cc 100644 --- a/tensorboard/uploader/uploader_test.py +++ b/tensorboard/uploader/uploader_test.py @@ -61,7 +61,7 @@ def _create_mock_client(self): stub = write_service_pb2_grpc.TensorBoardWriterServiceStub(test_channel) mock_client = mock.create_autospec(stub) fake_exp_response = write_service_pb2.CreateExperimentResponse( - experiment_id="123", url="https://example.com/123") + experiment_id="123", url="should not be used!") mock_client.CreateExperiment.return_value = fake_exp_response return mock_client @@ -69,8 +69,8 @@ def test_create_experiment(self): logdir = "/logs/foo" mock_client = self._create_mock_client() uploader = uploader_lib.TensorBoardUploader(mock_client, logdir) - url = uploader.create_experiment() - self.assertEqual(url, "https://example.com/123") + eid = uploader.create_experiment() + self.assertEqual(eid, "123") def test_start_uploading_without_create_experiment_fails(self): mock_client = self._create_mock_client() diff --git a/tensorboard/uploader/util.py b/tensorboard/uploader/util.py index 795758c4c1..16ad493bc5 100644 --- a/tensorboard/uploader/util.py +++ b/tensorboard/uploader/util.py @@ -18,6 +18,7 @@ from __future__ import division from __future__ import print_function +import datetime import errno import os import os.path @@ -112,3 +113,51 @@ def set_timestamp(pb, seconds_since_epoch): """ pb.seconds = int(seconds_since_epoch) pb.nanos = int(round((seconds_since_epoch % 1) * 10**9)) + + +def format_time(timestamp_pb, now=None): + """Converts a `timestamp_pb2.Timestamp` to human-readable string. + + This always includes the absolute date and time, and for recent dates + may include a relative time like "(just now)" or "(2 hours ago)". + + Args: + timestamp_pb: A `google.protobuf.timestamp_pb2.Timestamp` value to + convert to string. The input will not be modified. + now: A `datetime.datetime` object representing the current time, + used for determining relative times like "just now". Optional; + defaults to `datetime.datetime.now()`. + + Returns: + A string suitable for human consumption. + """ + + # Add and subtract a day for , + # which breaks early datetime conversions on Windows for small + # timestamps. + dt = datetime.datetime.fromtimestamp(timestamp_pb.seconds + 86400) + dt = dt - datetime.timedelta(seconds=86400) + + if now is None: + now = datetime.datetime.now() + ago = now.replace(microsecond=0) - dt + + def ago_text(n, singular, plural): + return "%d %s ago" % (n, singular if n == 1 else plural) + + relative = None + if ago < datetime.timedelta(seconds=5): + relative = "just now" + elif ago < datetime.timedelta(minutes=1): + relative = ago_text(int(ago.total_seconds()), "second", "seconds") + elif ago < datetime.timedelta(hours=1): + relative = ago_text(int(ago.total_seconds()) // 60, "minute", "minutes") + elif ago < datetime.timedelta(days=1): + relative = ago_text(int(ago.total_seconds()) // 3600, "hour", "hours") + + relative_part = " (%s)" % relative if relative is not None else "" + return str(dt) + relative_part + + +def _ngettext(n, singular, plural): + return "%d %s ago" % (n, singular if n == 1 else plural) diff --git a/tensorboard/uploader/util_test.py b/tensorboard/uploader/util_test.py index b0670d5315..090547f70c 100644 --- a/tensorboard/uploader/util_test.py +++ b/tensorboard/uploader/util_test.py @@ -18,8 +18,10 @@ from __future__ import division from __future__ import print_function +import datetime import os import unittest +import mock try: @@ -195,5 +197,50 @@ def test_set_timestamp(self): self.assertEqual(pb.nanos, 7812500) +class FormatTimeTest(tb_test.TestCase): + + def _run(self, t=None, now=None): + timestamp_pb = timestamp_pb2.Timestamp() + util.set_timestamp(timestamp_pb, t) + now = datetime.datetime.fromtimestamp(now) + with mock.patch.dict(os.environ, {"TZ": "UTC"}): + return util.format_time(timestamp_pb, now=now) + + def test_just_now(self): + base = 1546398245 + actual = self._run(t=base, now=base + 1) + self.assertEqual(actual, "2019-01-02 03:04:05 (just now)") + + def test_seconds_ago(self): + base = 1546398245 + actual = self._run(t=base, now=base + 10) + self.assertEqual(actual, "2019-01-02 03:04:05 (10 seconds ago)") + + def test_minute_ago(self): + base = 1546398245 + actual = self._run(t=base, now=base + 66) + self.assertEqual(actual, "2019-01-02 03:04:05 (1 minute ago)") + + def test_minutes_ago(self): + base = 1546398245 + actual = self._run(t=base, now=base + 222) + self.assertEqual(actual, "2019-01-02 03:04:05 (3 minutes ago)") + + def test_hour_ago(self): + base = 1546398245 + actual = self._run(t=base, now=base + 3601) + self.assertEqual(actual, "2019-01-02 03:04:05 (1 hour ago)") + + def test_hours_ago(self): + base = 1546398245 + actual = self._run(t=base, now=base + 9999) + self.assertEqual(actual, "2019-01-02 03:04:05 (2 hours ago)") + + def test_long_ago(self): + base = 1546398245 + actual = self._run(t=base, now=base + 7 * 86400) + self.assertEqual(actual, "2019-01-02 03:04:05") + + if __name__ == "__main__": tb_test.main() diff --git a/tensorboard/version.py b/tensorboard/version.py index 09fea74ee6..5d0ee3c356 100644 --- a/tensorboard/version.py +++ b/tensorboard/version.py @@ -15,4 +15,4 @@ """Contains the version string.""" -VERSION = '2.0.1' +VERSION = '2.0.2'