Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
doitian committed Mar 26, 2021
2 parents db27543 + 346663d commit 32acd32
Show file tree
Hide file tree
Showing 15 changed files with 843 additions and 254 deletions.
378 changes: 167 additions & 211 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 14 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
[package]
name = "ckb-cli"
version = "0.40.0"
version = "0.41.0"
license = "MIT"
authors = ["Linfeng Qian <[email protected]>", "Nervos Core Dev <[email protected]>"]
edition = "2018"

[dependencies]
ckb-jsonrpc-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-hash = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-crypto = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2", features = ["secp"] }
ckb-build-info = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-util = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-resource = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-dao-utils = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-chain-spec = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-jsonrpc-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-hash = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-crypto = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1", features = ["secp"] }
ckb-build-info = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-util = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-resource = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-dao-utils = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-chain-spec = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-sdk = { path = "ckb-sdk" }
ckb-index = { path = "ckb-index" }
plugin-protocol = { path = "plugin-protocol", package = "ckb-cli-plugin-protocol" }
Expand Down Expand Up @@ -52,12 +52,15 @@ multiaddr = { package = "parity-multiaddr", version = "0.4.0" }
byteorder = "1.3.2"
itertools = "0.8.0"

tokio = { version = "1", features = ["net", "io-util", "rt"] }
futures = "0.3"

[target.'cfg(unix)'.dependencies]
tui = "0.6.0"
termion = "1.5"

[build-dependencies]
ckb-build-info = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-build-info = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }

[workspace]
members = ["ckb-sdk", "ckb-index", "ckb-sdk-types", "plugin-protocol"]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ci: fmt clippy test security-audit
git diff --exit-code Cargo.lock

integration:
bash devtools/ci/integration.sh v0.40.0-rc2
bash devtools/ci/integration.sh v0.41.0-rc1

prod: ## Build binary with release profile.
cargo build --release
Expand Down
4 changes: 2 additions & 2 deletions ckb-index/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ckb-index"
version = "0.40.0"
version = "0.41.0"
authors = ["Linfeng Qian <[email protected]>", "Nervos Core Dev <[email protected]>"]
edition = "2018"
license = "MIT"
Expand All @@ -11,6 +11,6 @@ serde_derive = "1.0"
bincode = "1.1.4"
log = "0.4.6"
failure = "0.1.5"
ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-sdk = { path = "../ckb-sdk" }
rocksdb = { package = "ckb-rocksdb", version = "=0.13.0", features = ["snappy"] }
14 changes: 7 additions & 7 deletions ckb-sdk-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ckb-sdk-types"
version = "0.40.0"
version = "0.41.0"
authors = ["Linfeng Qian <[email protected]>", "Nervos Core Dev <[email protected]>"]
edition = "2018"
license = "MIT"
Expand All @@ -9,11 +9,11 @@ license = "MIT"
serde = { version = "1.0", features = ["rc"] }
serde_derive = "1.0"

ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-traits = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-jsonrpc-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-hash = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-error = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-traits = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-jsonrpc-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-hash = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-error = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }

[dev-dependencies]
ckb-crypto = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2", features = ["secp"] }
ckb-crypto = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1", features = ["secp"] }
20 changes: 12 additions & 8 deletions ckb-sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ckb-sdk"
version = "0.40.0"
version = "0.41.0"
authors = ["Linfeng Qian <[email protected]>", "Nervos Core Dev <[email protected]>"]
edition = "2018"
license = "MIT"
Expand All @@ -24,12 +24,16 @@ bitcoin_hashes = "0.3.2"
uuid = { version = "0.7.4", features = ["v4"] }
chrono = "0.4.6"
failure = "0.1.5"
tokio-util = { version = "0.6", features = ["codec"] }
tokio = { version = "1" }
bytes = "1"
futures = "0.3"

ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-error = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-script = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-jsonrpc-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-hash = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-resource = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2" }
ckb-crypto = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.40.0-rc2", features = ["secp"] }
ckb-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-error = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-script = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-jsonrpc-types = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-hash = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-resource = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1" }
ckb-crypto = { git = "https://github.com/nervosnetwork/ckb", tag = "v0.41.0-rc1", features = ["secp"] }
ckb-sdk-types = { path = "../ckb-sdk-types" }
1 change: 1 addition & 0 deletions ckb-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod tx_helper;
mod types;

pub mod constants;
pub mod pubsub;
pub mod rpc;
pub mod wallet;

Expand Down
212 changes: 212 additions & 0 deletions ckb-sdk/src/pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/// This module provides a general rpc subscription client,
/// you can use it with any connection method that implements `AsyncWrite + AsyncRead`.
/// The simplest TCP connection is as follows:
///
/// ```ignore
/// use ckb_jsonrpc_types::HeaderView;
/// use ckb_types::core::HeaderView as CoreHeaderView;
/// use tokio::net::{TcpStream, ToSocketAddrs};
///
/// pub async fn new_tcp_client(addr: impl ToSocketAddrs) -> io::Result<Client<TcpStream>> {
/// let tcp = TcpStream::connect(addr).await?;
/// Ok(Client::new(tcp))
/// }
///
/// fn main() {
/// let mut rt = tokio::runtime::Runtime::new().unwrap();
/// rt.block_on(async {
/// let c = new_tcp_client("127.0.0.1:18114").await.unwrap();
/// let mut h = c
/// .subscribe::<HeaderView>("new_tip_header")
/// .await
/// .unwrap();
/// while let Some(Ok(r)) = h.next().await {
/// let core: CoreHeaderView = r.into();
/// println!(
/// "number: {}, difficulty: {}, epoch: {}, timestamp: {}",
/// core.number(),
/// core.difficulty(),
/// core.epoch(),
/// core.timestamp()
/// )
/// }
/// });
/// }
/// ```
///
use std::{
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

use futures::{
sink::SinkExt,
stream::{Stream, StreamExt},
};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Framed;

use stream_codec::StreamCodec;

mod stream_codec;

/// General rpc subscription client
pub struct Client<T> {
inner: Framed<T, StreamCodec>,
id: usize,
}

impl<T> Client<T>
where
T: tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin,
{
/// New a pubsub rpc client
pub fn new(io: T) -> Client<T> {
let inner = Framed::new(io, StreamCodec::stream_incoming());
Client { inner, id: 0 }
}

/// Subscription a topic
pub async fn subscribe<F: for<'de> serde::de::Deserialize<'de>>(
mut self,
name: &str,
) -> io::Result<Handle<T, F>> {
// telnet localhost 18114
// > {"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}
// < {"jsonrpc":"2.0","result":0,"id":2}
// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
// "subscription":0}}
// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
// "subscription":0}}
// < ...
// > {"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": [0]}
// < {"jsonrpc":"2.0","result":true,"id":2}

let req_json = format!(
r#"{{"id": {}, "jsonrpc": "2.0", "method": "subscribe", "params": ["{}"]}}"#,
self.id, name
);
self.id = self.id.wrapping_add(1);

self.inner.send(req_json).await?;
let (resp, inner) = self.inner.into_future().await;
let output = serde_json::from_slice::<ckb_jsonrpc_types::response::Output>(
&resp.ok_or_else::<io::Error, _>(|| io::ErrorKind::BrokenPipe.into())??,
)?;

match output {
ckb_jsonrpc_types::response::Output::Success(success) => {
let res = serde_json::from_value::<String>(success.result).unwrap();
Ok(Handle {
inner,
topic: name.to_string(),
sub_id: res,
output: PhantomData::default(),
rpc_id: self.id,
})
}
ckb_jsonrpc_types::response::Output::Failure(e) => {
Err(io::Error::new(io::ErrorKind::InvalidData, e.error))
}
}
}
}

/// General rpc subscription topic handle
pub struct Handle<T, F> {
inner: Framed<T, StreamCodec>,
topic: String,
sub_id: String,
output: PhantomData<F>,
rpc_id: usize,
}

impl<T, F> Handle<T, F>
where
T: tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin,
{
/// Sub id
pub fn id(&self) -> &str {
&self.sub_id
}

/// Topic name
pub fn topic(&self) -> &str {
&self.topic
}

/// Unsubscribe and return this Client
pub async fn unsubscribe(mut self) -> io::Result<Client<T>> {
let req_json = format!(
r#"{{"id": {}, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["{}"]}}"#,
self.rpc_id, self.sub_id
);
self.rpc_id = self.rpc_id.wrapping_add(1);

self.inner.send(req_json).await?;

let mut inner = self.inner;

let (output, inner) = loop {
let (resp, next_inner) = inner.into_future().await;

// Since the current subscription state, the return value may be a notification,
// we need to ensure that the unsubscribed message returns before jumping out
if let Ok(output) = serde_json::from_slice::<ckb_jsonrpc_types::response::Output>(
&resp.ok_or_else::<io::Error, _>(|| io::ErrorKind::BrokenPipe.into())??,
) {
break (output, next_inner);
} else {
inner = next_inner;
}
};

match output {
ckb_jsonrpc_types::response::Output::Success(_) => Ok(Client {
inner,
id: self.rpc_id,
}),
ckb_jsonrpc_types::response::Output::Failure(e) => {
Err(io::Error::new(io::ErrorKind::InvalidData, e.error))
}
}
}
}

impl<T, F> Stream for Handle<T, F>
where
F: for<'de> serde::de::Deserialize<'de> + Unpin,
T: tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin,
{
type Item = io::Result<F>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(frame))) => {
let output =
serde_json::from_slice::<ckb_jsonrpc_types::request::Notification>(&frame)
.expect("must parse to notification");
let message = output
.params
.parse::<Message>()
.expect("must parse to message");

Poll::Ready(Some(
serde_json::from_str::<F>(&message.result)
.map_err(|_| io::ErrorKind::InvalidData.into()),
))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
}
}
}

#[derive(Deserialize, Serialize, Debug)]
struct Message {
result: String,
subscription: String,
}
Loading

0 comments on commit 32acd32

Please sign in to comment.