Skip to content

Support for js-ipfs #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Nov 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 151 additions & 150 deletions doc/Documentation.csproj

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions doc/articles/envvars.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Environment variables

The following [environment variables](https://msdn.microsoft.com/en-us/library/windows/desktop/ms682653.aspx)
are used to control the behaviour of the Ipfs Client

| Name | Description |
| --- | --- |
| IpfsHttpUrl | The default URL to the IPFS HTTP API server. See [DefaultApiUri](xref:Ipfs.Api.IpfsClient.DefaultApiUri) for more details. |
16 changes: 9 additions & 7 deletions doc/articles/toc.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
- name: Introduction
href: intro.md
- name: Asynchronous I/O
href: async.md
items:
- name: Cancellation
href: cancellation.md
- name: Introduction
href: intro.md
- name: Asynchronous I/O
href: async.md
items:
- name: Cancellation
href: cancellation.md
- name: Environment variables
href: envvars.md
309 changes: 162 additions & 147 deletions src/CoreApi/PubSubApi.cs
Original file line number Diff line number Diff line change
@@ -1,147 +1,162 @@
using Common.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Ipfs.Api
{

/// <summary>
/// Allows you to publish messages to a given topic, and also to
/// subscribe to new messages on a given topic.
/// </summary>
/// <remarks>
/// This API is accessed via the <see cref="IpfsClient.PubSub"/> property.
/// <para>
/// This is an experimental feature. It is not intended in its current state
/// to be used in a production environment.
/// </para>
/// <para>
/// To use, the daemon must be run with '--enable-pubsub-experiment'.
/// </para>
/// </remarks>
/// <seealso href="https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/PUBSUB.md">PUBSUB API</seealso>
public class PubSubApi
{
static ILog log = LogManager.GetLogger<PubSubApi>();

IpfsClient ipfs;

internal PubSubApi(IpfsClient ipfs)
{
this.ipfs = ipfs;
}

/// <summary>
/// Get the subscribed topics.
/// </summary>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
/// <returns>
/// A sequence of <see cref="string"/> for each topic.
/// </returns>
public async Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken))
{
var json = await ipfs.DoCommandAsync("pubsub/ls", cancel);
var result = JObject.Parse(json);
var strings = result["Strings"] as JArray;
if (strings == null) return new string[0];
return strings.Select(s => (string)s);
}

/// <summary>
/// Get the peers that are pubsubing with us.
/// </summary>
/// <param name="topic">
/// When specified, only peers pubsubing on the topic are returned.
/// </param>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
/// <returns>
/// A sequence of <see cref="string"/> for each peer ID.
/// </returns>
public async Task<IEnumerable<string>> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken))
{
var json = await ipfs.DoCommandAsync("pubsub/peers", cancel, topic);
var result = JObject.Parse(json);
var strings = result["Strings"] as JArray;
if (strings == null) return new string[0];
return strings.Select(s => (string)s);
}

/// <summary>
/// Publish a message to a given topic.
/// </summary>
/// <param name="topic">
/// The topic name.
/// </param>
/// <param name="message">
/// The message to publish.
/// </param>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
public async Task Publish(string topic, string message, CancellationToken cancel = default(CancellationToken))
{
var _ = await ipfs.PostCommandAsync("pubsub/pub", cancel, topic, "arg=" + message);
return;
}

/// <summary>
/// Subscribe to messages on a given topic.
/// </summary>
/// <param name="topic">
/// The topic name.
/// </param>
/// <param name="handler">
/// The action to perform when a <see cref="PublishedMessage"/> is received.
/// </param>
/// <param name="cancellationToken">
/// Is used to stop the topic listener. When cancelled, the <see cref="OperationCanceledException"/>
/// is <b>NOT</b> raised.
/// </param>
/// <returns>
/// After the topic listener is register with the IPFS server.
/// </returns>
/// <remarks>
/// The <paramref name="handler"/> is invoked on the topic listener thread.
/// </remarks>
public async Task Subscribe(string topic, Action<PublishedMessage> handler, CancellationToken cancellationToken = default(CancellationToken))
{
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic);
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessMessages(topic, handler, messageStream, cancellationToken));
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
return;
}

void ProcessMessages(string topic, Action<PublishedMessage> handler, Stream stream, CancellationToken ct)
{
log.DebugFormat("Start listening for '{0}' messages", topic);
using (var sr = new StreamReader(stream))
{
while (!sr.EndOfStream && !ct.IsCancellationRequested)
{
var json = sr.ReadLine();
if (log.IsDebugEnabled)
log.DebugFormat("PubSub message {0}", json);
if (json != "{}" && !ct.IsCancellationRequested)
{
handler(new PublishedMessage(json));
}
}
}
log.DebugFormat("Stop listening for '{0}' messages", topic);
}

}

}
using Common.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Ipfs.Api
{

/// <summary>
/// Allows you to publish messages to a given topic, and also to
/// subscribe to new messages on a given topic.
/// </summary>
/// <remarks>
/// This API is accessed via the <see cref="IpfsClient.PubSub"/> property.
/// <para>
/// This is an experimental feature. It is not intended in its current state
/// to be used in a production environment.
/// </para>
/// <para>
/// To use, the daemon must be run with '--enable-pubsub-experiment'.
/// </para>
/// </remarks>
/// <seealso href="https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/PUBSUB.md">PUBSUB API</seealso>
public class PubSubApi
{
static ILog log = LogManager.GetLogger<PubSubApi>();

IpfsClient ipfs;

internal PubSubApi(IpfsClient ipfs)
{
this.ipfs = ipfs;
}

/// <summary>
/// Get the subscribed topics.
/// </summary>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
/// <returns>
/// A sequence of <see cref="string"/> for each topic.
/// </returns>
public async Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken))
{
var json = await ipfs.DoCommandAsync("pubsub/ls", cancel);
var result = JObject.Parse(json);
var strings = result["Strings"] as JArray;
if (strings == null) return new string[0];
return strings.Select(s => (string)s);
}

/// <summary>
/// Get the peers that are pubsubing with us.
/// </summary>
/// <param name="topic">
/// When specified, only peers pubsubing on the topic are returned.
/// </param>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
/// <returns>
/// A sequence of <see cref="string"/> for each peer ID.
/// </returns>
public async Task<IEnumerable<string>> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken))
{
var json = await ipfs.DoCommandAsync("pubsub/peers", cancel, topic);
var result = JObject.Parse(json);
var strings = result["Strings"] as JArray;
if (strings == null) return new string[0];
return strings.Select(s => (string)s);
}

/// <summary>
/// Publish a message to a given topic.
/// </summary>
/// <param name="topic">
/// The topic name.
/// </param>
/// <param name="message">
/// The message to publish.
/// </param>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
public async Task Publish(string topic, string message, CancellationToken cancel = default(CancellationToken))
{
var _ = await ipfs.PostCommandAsync("pubsub/pub", cancel, topic, "arg=" + message);
return;
}

/// <summary>
/// Subscribe to messages on a given topic.
/// </summary>
/// <param name="topic">
/// The topic name.
/// </param>
/// <param name="handler">
/// The action to perform when a <see cref="PublishedMessage"/> is received.
/// </param>
/// <param name="cancellationToken">
/// Is used to stop the topic listener. When cancelled, the <see cref="OperationCanceledException"/>
/// is <b>NOT</b> raised.
/// </param>
/// <returns>
/// After the topic listener is register with the IPFS server.
/// </returns>
/// <remarks>
/// The <paramref name="handler"/> is invoked on the topic listener thread.
/// </remarks>
public async Task Subscribe(string topic, Action<PublishedMessage> handler, CancellationToken cancellationToken)
{
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic);
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessMessages(topic, handler, messageStream, cancellationToken));
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
return;
}

void ProcessMessages(string topic, Action<PublishedMessage> handler, Stream stream, CancellationToken ct)
{
log.DebugFormat("Start listening for '{0}' messages", topic);

using (var sr = new StreamReader(stream))
{
// .Net needs a ReadLine(CancellationToken)
// As a work-around, we register a function to close the stream
ct.Register(() => sr.Dispose());
try
{
while (!sr.EndOfStream && !ct.IsCancellationRequested)
{
var json = sr.ReadLine();
if (json == null)
break;
if (log.IsDebugEnabled)
log.DebugFormat("PubSub message {0}", json);
if (json != "{}" && !ct.IsCancellationRequested)
{
handler(new PublishedMessage(json));
}
}
}
catch (Exception e)
{
// Do not report errors when cancelled.
if (!ct.IsCancellationRequested)
log.Error(e);
}
}
log.DebugFormat("Stop listening for '{0}' messages", topic);
}

}

}
2 changes: 1 addition & 1 deletion src/CoreApi/SwarmApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ internal SwarmApi(IpfsClient ipfs)

TimeSpan ParseLatency(string latency)
{
if (latency == "n/a")
if (latency == "n/a" || latency == "unknown")
{
return TimeSpan.Zero;
}
Expand Down
2 changes: 1 addition & 1 deletion src/IpfsApi.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Ipfs.Core" Version="0.8.3" />
<PackageReference Include="Ipfs.Core" Version="0.10.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'netstandard14'" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'net45'" />
Expand Down
19 changes: 16 additions & 3 deletions src/IpfsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ public partial class IpfsClient
static HttpClient api = null;

/// <summary>
/// The default URL to the IPFS API server. The default is "http://localhost:5001".
/// The default URL to the IPFS HTTP API server.
/// </summary>
public static Uri DefaultApiUri = new Uri("http://localhost:5001");
/// <value>
/// The default is "http://localhost:5001".
/// </value>
/// <remarks>
/// The environment variable "IpfsHttpApi" overrides this value.
/// </remarks>
public static Uri DefaultApiUri = new Uri(
Environment.GetEnvironmentVariable("IpfsHttpApi")
?? "http://localhost:5001");

/// <summary>
/// Creates a new instance of the <see cref="IpfsClient"/> class and sets the
Expand Down Expand Up @@ -508,7 +516,12 @@ async Task<bool> ThrowOnErrorAsync(HttpResponseMessage response)
if (response.IsSuccessStatusCode)
return true;
if (response.StatusCode == HttpStatusCode.NotFound)
throw new HttpRequestException("Invalid IPFS command");
{
var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri.ToString();
if (log.IsDebugEnabled)
log.Debug("ERR " + error);
throw new HttpRequestException(error);
}

var body = await response.Content.ReadAsStringAsync();
if (log.IsDebugEnabled)
Expand Down
Loading