TUN-813: Clean up cloudflared dependencies

This commit is contained in:
Areg Harutyunyan
2018-07-24 18:04:33 -05:00
parent d06fc520c7
commit 0468866626
3310 changed files with 993 additions and 1223303 deletions

View File

@@ -1,55 +0,0 @@
package rpc_test
import (
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
)
func BenchmarkPingPong(b *testing.B) {
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
log := testLogger{b}
c := rpc.NewConn(p, rpc.ConnLog(log))
d := rpc.NewConn(q, rpc.ConnLog(log), rpc.BootstrapFunc(bootstrapPingPong))
defer d.Wait()
defer c.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := testcapnp.PingPong{Client: c.Bootstrap(ctx)}
b.ResetTimer()
for i := 0; i < b.N; i++ {
promise := client.EchoNum(ctx, func(p testcapnp.PingPong_echoNum_Params) error {
p.SetN(42)
return nil
})
result, err := promise.Struct()
if err != nil {
b.Errorf("EchoNum(42) failed on iteration %d: %v", i, err)
break
}
if result.N() != 42 {
b.Errorf("EchoNum(42) = %d; want 42", result.N())
break
}
}
}
func bootstrapPingPong(ctx context.Context) (capnp.Client, error) {
return testcapnp.PingPong_ServerToClient(pingPongServer{}).Client, nil
}
type pingPongServer struct{}
func (pingPongServer) EchoNum(call testcapnp.PingPong_echoNum) error {
call.Results.SetN(call.Params.N())
return nil
}

View File

@@ -1,52 +0,0 @@
package rpc_test
import (
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
"zombiezen.com/go/capnproto2/server"
)
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := testLogger{t}
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
c := rpc.NewConn(p, rpc.ConnLog(log))
notify := make(chan struct{})
hanger := testcapnp.Hanger_ServerToClient(Hanger{notify: notify})
d := rpc.NewConn(q, rpc.MainInterface(hanger.Client), rpc.ConnLog(log))
defer d.Wait()
defer c.Close()
client := testcapnp.Hanger{Client: c.Bootstrap(ctx)}
subctx, subcancel := context.WithCancel(ctx)
promise := client.Hang(subctx, nil)
<-notify
subcancel()
_, err := promise.Struct()
<-notify // test will deadlock if cancel not delivered
if err != context.Canceled {
t.Errorf("promise.Get() error: %v; want %v", err, context.Canceled)
}
}
type Hanger struct {
notify chan struct{}
}
func (h Hanger) Hang(call testcapnp.Hanger_hang) error {
server.Ack(call.Options)
h.notify <- struct{}{}
<-call.Ctx.Done()
close(h.notify)
return nil
}

View File

@@ -1,91 +0,0 @@
package rpc_test
import (
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
)
func TestEmbargo(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
log := testLogger{t}
c := rpc.NewConn(p, rpc.ConnLog(log))
echoSrv := testcapnp.Echoer_ServerToClient(new(Echoer))
d := rpc.NewConn(q, rpc.MainInterface(echoSrv.Client), rpc.ConnLog(log))
defer d.Wait()
defer c.Close()
client := testcapnp.Echoer{Client: c.Bootstrap(ctx)}
localCap := testcapnp.CallOrder_ServerToClient(new(CallOrder))
earlyCall := callseq(ctx, client.Client, 0)
echo := client.Echo(ctx, func(p testcapnp.Echoer_echo_Params) error {
return p.SetCap(localCap)
})
pipeline := echo.Cap()
call0 := callseq(ctx, pipeline.Client, 0)
call1 := callseq(ctx, pipeline.Client, 1)
_, err := earlyCall.Struct()
if err != nil {
t.Errorf("earlyCall error: %v", err)
}
call2 := callseq(ctx, pipeline.Client, 2)
_, err = echo.Struct()
if err != nil {
t.Errorf("echo.Get() error: %v", err)
}
call3 := callseq(ctx, pipeline.Client, 3)
call4 := callseq(ctx, pipeline.Client, 4)
call5 := callseq(ctx, pipeline.Client, 5)
check := func(promise testcapnp.CallOrder_getCallSequence_Results_Promise, n uint32) {
r, err := promise.Struct()
if err != nil {
t.Errorf("call%d error: %v", n, err)
}
if r.N() != n {
t.Errorf("call%d = %d; want %d", n, r.N(), n)
}
}
check(call0, 0)
check(call1, 1)
check(call2, 2)
check(call3, 3)
check(call4, 4)
check(call5, 5)
}
func callseq(c context.Context, client capnp.Client, n uint32) testcapnp.CallOrder_getCallSequence_Results_Promise {
return testcapnp.CallOrder{Client: client}.GetCallSequence(c, func(p testcapnp.CallOrder_getCallSequence_Params) error {
p.SetExpected(n)
return nil
})
}
type CallOrder struct {
n uint32
}
func (co *CallOrder) GetCallSequence(call testcapnp.CallOrder_getCallSequence) error {
call.Results.SetN(co.n)
co.n++
return nil
}
type Echoer struct {
CallOrder
}
func (*Echoer) Echo(call testcapnp.Echoer_echo) error {
call.Results.SetCap(call.Params.Cap())
return nil
}

View File

@@ -1,75 +0,0 @@
package rpc_test
import (
"fmt"
"net"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
"zombiezen.com/go/capnproto2/server"
)
func Example() {
// Create an in-memory transport. In a real application, you would probably
// use a net.TCPConn (for RPC) or an os.Pipe (for IPC).
p1, p2 := net.Pipe()
t1, t2 := rpc.StreamTransport(p1), rpc.StreamTransport(p2)
// Server-side
srv := testcapnp.Adder_ServerToClient(AdderServer{})
serverConn := rpc.NewConn(t1, rpc.MainInterface(srv.Client))
defer serverConn.Wait()
// Client-side
ctx := context.Background()
clientConn := rpc.NewConn(t2)
defer clientConn.Close()
adderClient := testcapnp.Adder{Client: clientConn.Bootstrap(ctx)}
// Every client call returns a promise. You can make multiple calls
// concurrently.
call1 := adderClient.Add(ctx, func(p testcapnp.Adder_add_Params) error {
p.SetA(5)
p.SetB(2)
return nil
})
call2 := adderClient.Add(ctx, func(p testcapnp.Adder_add_Params) error {
p.SetA(10)
p.SetB(20)
return nil
})
// Calling Struct() on a promise waits until it returns.
result1, err := call1.Struct()
if err != nil {
fmt.Println("Add #1 failed:", err)
return
}
result2, err := call2.Struct()
if err != nil {
fmt.Println("Add #2 failed:", err)
return
}
fmt.Println("Results:", result1.Result(), result2.Result())
// Output:
// Results: 7 30
}
// An AdderServer is a local implementation of the Adder interface.
type AdderServer struct{}
// Add implements a method
func (AdderServer) Add(call testcapnp.Adder_add) error {
// Acknowledging the call allows other calls to be made (it returns the Answer
// to the caller).
server.Ack(call.Options)
// Parameters are accessed with call.Params.
a := call.Params.A()
b := call.Params.B()
// A result struct is allocated for you at call.Results.
call.Results.SetResult(a + b)
return nil
}

View File

@@ -1,15 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["logtransport.go"],
importpath = "zombiezen.com/go/capnproto2/rpc/internal/logtransport",
visibility = ["//rpc:__subpackages__"],
deps = [
"//encoding/text:go_default_library",
"//rpc:go_default_library",
"//rpc/internal/logutil:go_default_library",
"//std/capnp/rpc:go_default_library",
"@org_golang_x_net//context:go_default_library",
],
)

View File

@@ -1,52 +0,0 @@
// Package logtransport provides a transport that logs all of its messages.
package logtransport
import (
"bytes"
"io"
"log"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2/encoding/text"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logutil"
rpccapnp "zombiezen.com/go/capnproto2/std/capnp/rpc"
)
type transport struct {
rpc.Transport
l *log.Logger
sendBuf bytes.Buffer
recvBuf bytes.Buffer
}
// New creates a new logger that proxies messages to and from t and
// logs them to l. If l is nil, then the log package's default
// logger is used.
func New(l *log.Logger, t rpc.Transport) rpc.Transport {
return &transport{Transport: t, l: l}
}
func (t *transport) SendMessage(ctx context.Context, msg rpccapnp.Message) error {
t.sendBuf.Reset()
t.sendBuf.WriteString("<- ")
formatMsg(&t.sendBuf, msg)
logutil.Print(t.l, t.sendBuf.String())
return t.Transport.SendMessage(ctx, msg)
}
func (t *transport) RecvMessage(ctx context.Context) (rpccapnp.Message, error) {
msg, err := t.Transport.RecvMessage(ctx)
if err != nil {
return msg, err
}
t.recvBuf.Reset()
t.recvBuf.WriteString("-> ")
formatMsg(&t.recvBuf, msg)
logutil.Print(t.l, t.recvBuf.String())
return msg, nil
}
func formatMsg(w io.Writer, m rpccapnp.Message) {
text.NewEncoder(w).Encode(0x91b79f1f808db032, m.Struct)
}

View File

@@ -1,8 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["logutil.go"],
importpath = "zombiezen.com/go/capnproto2/rpc/internal/logutil",
visibility = ["//rpc:__subpackages__"],
)

View File

@@ -1,36 +0,0 @@
// Package logutil provides functions that can print to a logger.
// Any function in this package that takes in a *log.Logger can be
// passed nil to use the log package's default logger.
package logutil
import "log"
// Print calls Print on a logger or the default logger.
// Arguments are handled in the manner of fmt.Print.
func Print(l *log.Logger, v ...interface{}) {
if l == nil {
log.Print(v...)
} else {
l.Print(v...)
}
}
// Printf calls Printf on a logger or the default logger.
// Arguments are handled in the manner of fmt.Printf.
func Printf(l *log.Logger, format string, v ...interface{}) {
if l == nil {
log.Printf(format, v...)
} else {
l.Printf(format, v...)
}
}
// Println calls Println on a logger or the default logger.
// Arguments are handled in the manner of fmt.Println.
func Println(l *log.Logger, v ...interface{}) {
if l == nil {
log.Println(v...)
} else {
l.Println(v...)
}
}

View File

@@ -1,14 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["pipetransport.go"],
importpath = "zombiezen.com/go/capnproto2/rpc/internal/pipetransport",
visibility = ["//rpc:__subpackages__"],
deps = [
"//:go_default_library",
"//rpc:go_default_library",
"//std/capnp/rpc:go_default_library",
"@org_golang_x_net//context:go_default_library",
],
)

View File

@@ -1,139 +0,0 @@
// Package pipetransport provides in-memory implementations of rpc.Transport for testing.
package pipetransport
import (
"bytes"
"errors"
"sync"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2"
"zombiezen.com/go/capnproto2/rpc"
rpccapnp "zombiezen.com/go/capnproto2/std/capnp/rpc"
)
type pipeTransport struct {
r <-chan rpccapnp.Message
w chan<- rpccapnp.Message
finish chan struct{}
otherFin chan struct{}
rbuf bytes.Buffer
mu sync.Mutex
inflight int
done bool
}
// New creates a synchronous in-memory pipe transport.
func New() (p, q rpc.Transport) {
a, b := make(chan rpccapnp.Message), make(chan rpccapnp.Message)
afin, bfin := make(chan struct{}), make(chan struct{})
p = &pipeTransport{
r: a,
w: b,
finish: afin,
otherFin: bfin,
}
q = &pipeTransport{
r: b,
w: a,
finish: bfin,
otherFin: afin,
}
return
}
func (p *pipeTransport) SendMessage(ctx context.Context, msg rpccapnp.Message) error {
if !p.startSend() {
return errClosed
}
defer p.finishSend()
buf, err := msg.Segment().Message().Marshal()
if err != nil {
return err
}
mm, err := capnp.Unmarshal(buf)
if err != nil {
return err
}
msg, err = rpccapnp.ReadRootMessage(mm)
if err != nil {
return err
}
select {
case p.w <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
case <-p.finish:
return errClosed
case <-p.otherFin:
return errBrokenPipe
}
}
func (p *pipeTransport) startSend() bool {
p.mu.Lock()
ok := !p.done
if ok {
p.inflight++
}
p.mu.Unlock()
return ok
}
func (p *pipeTransport) finishSend() {
p.mu.Lock()
p.inflight--
p.mu.Unlock()
}
func (p *pipeTransport) RecvMessage(ctx context.Context) (rpccapnp.Message, error) {
// Scribble over shared buffer to test for race conditions.
for b, i := p.rbuf.Bytes(), 0; i < len(b); i++ {
b[i] = 0xff
}
p.rbuf.Reset()
select {
case msg, ok := <-p.r:
if !ok {
return rpccapnp.Message{}, errBrokenPipe
}
if err := capnp.NewEncoder(&p.rbuf).Encode(msg.Segment().Message()); err != nil {
return rpccapnp.Message{}, err
}
m, err := capnp.Unmarshal(p.rbuf.Bytes())
if err != nil {
return rpccapnp.Message{}, err
}
return rpccapnp.ReadRootMessage(m)
case <-ctx.Done():
return rpccapnp.Message{}, ctx.Err()
}
}
func (p *pipeTransport) Close() error {
p.mu.Lock()
done := p.done
if !done {
p.done = true
close(p.finish)
if p.inflight == 0 {
close(p.w)
}
}
p.mu.Unlock()
if done {
return errClosed
}
return nil
}
var (
errBrokenPipe = errors.New("pipetransport: broken pipe")
errClosed = errors.New("pipetransport: write to broken pipe")
)

View File

@@ -1,89 +0,0 @@
package refcount
import (
"testing"
"zombiezen.com/go/capnproto2"
)
func TestSingleRefCloses(t *testing.T) {
c := new(fakeClient)
_, ref := New(c)
err := ref.Close()
if err != nil {
t.Errorf("ref.Close(): %v", err)
}
if c.closed != 1 {
t.Errorf("client Close() called %d times; want 1 time", c.closed)
}
}
func TestCloseRefMultipleDecrefsOnce(t *testing.T) {
c := new(fakeClient)
rc, ref1 := New(c)
ref2 := rc.Ref()
err1 := ref1.Close()
err2 := ref1.Close()
_ = ref2
if err1 != nil {
t.Errorf("ref.Close() #1: %v", err1)
}
if err2 != errClosed {
t.Errorf("ref.Close() #2: %v; want %v", err2, errClosed)
}
if c.closed != 0 {
t.Errorf("client Close() called %d times; want 0 times", c.closed)
}
}
func TestClosingOneOfManyRefsDoesntClose(t *testing.T) {
c := new(fakeClient)
rc, ref1 := New(c)
ref2 := rc.Ref()
err := ref1.Close()
_ = ref2
if err != nil {
t.Errorf("ref1.Close(): %v", err)
}
if c.closed != 0 {
t.Errorf("client Close() called %d times; want 0 times", c.closed)
}
}
func TestClosingAllRefsCloses(t *testing.T) {
c := new(fakeClient)
rc, ref1 := New(c)
ref2 := rc.Ref()
err1 := ref1.Close()
err2 := ref2.Close()
if err1 != nil {
t.Errorf("ref1.Close(): %v", err1)
}
if err2 != nil {
t.Errorf("ref2.Close(): %v", err2)
}
if c.closed != 1 {
t.Errorf("client Close() called %d times; want 1 times", c.closed)
}
}
type fakeClient struct {
closed int
}
func (c *fakeClient) Call(cl *capnp.Call) capnp.Answer {
panic("not implemented")
}
func (c *fakeClient) Close() error {
c.closed++
return nil
}

View File

@@ -1,18 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"generate.go",
"test.capnp.go",
],
importpath = "zombiezen.com/go/capnproto2/rpc/internal/testcapnp",
visibility = ["//rpc:__subpackages__"],
deps = [
"//:go_default_library",
"//encoding/text:go_default_library",
"//schemas:go_default_library",
"//server:go_default_library",
"@org_golang_x_net//context:go_default_library",
],
)

View File

@@ -1,3 +0,0 @@
package testcapnp
//go:generate capnp compile -I ../../../std -ogo test.capnp

View File

@@ -1,40 +0,0 @@
# Test interfaces for RPC tests.
using Go = import "/go.capnp";
@0xef12a34b9807e19c;
$Go.package("testcapnp");
$Go.import("zombiezen.com/go/capnproto2/rpc/internal/testcapnp");
interface Handle {}
interface HandleFactory {
newHandle @0 () -> (handle :Handle);
}
interface Hanger {
hang @0 () -> ();
# Block until context is cancelled
}
interface CallOrder {
getCallSequence @0 (expected: UInt32) -> (n: UInt32);
# First call returns 0, next returns 1, ...
#
# The input `expected` is ignored but useful for disambiguating debug logs.
}
interface Echoer extends(CallOrder) {
echo @0 (cap :CallOrder) -> (cap :CallOrder);
# Just returns the input cap.
}
interface PingPong {
echoNum @0 (n :Int32) -> (n :Int32);
}
# Example interfaces
interface Adder {
add @0 (a :Int32, b :Int32) -> (result :Int32);
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,47 +0,0 @@
package rpc_test
import (
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
)
func TestIssue3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
log := testLogger{t}
c := rpc.NewConn(p, rpc.ConnLog(log))
echoSrv := testcapnp.Echoer_ServerToClient(new(SideEffectEchoer))
d := rpc.NewConn(q, rpc.MainInterface(echoSrv.Client), rpc.ConnLog(log))
defer d.Wait()
defer c.Close()
client := testcapnp.Echoer{Client: c.Bootstrap(ctx)}
localCap := testcapnp.CallOrder_ServerToClient(new(CallOrder))
echo := client.Echo(ctx, func(p testcapnp.Echoer_echo_Params) error {
return p.SetCap(localCap)
})
// This should not deadlock.
_, err := echo.Struct()
if err != nil {
t.Error("Echo error:", err)
}
}
type SideEffectEchoer struct {
CallOrder
}
func (*SideEffectEchoer) Echo(call testcapnp.Echoer_echo) error {
call.Params.Cap().GetCallSequence(call.Ctx, nil)
call.Results.SetCap(call.Params.Cap())
return nil
}

View File

@@ -1,60 +0,0 @@
package rpc_test
import (
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
"zombiezen.com/go/capnproto2/server"
)
func TestPromisedCapability(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
log := testLogger{t}
c := rpc.NewConn(p, rpc.ConnLog(log))
delay := make(chan struct{})
echoSrv := testcapnp.Echoer_ServerToClient(&DelayEchoer{delay: delay})
d := rpc.NewConn(q, rpc.MainInterface(echoSrv.Client), rpc.ConnLog(log))
defer d.Wait()
defer c.Close()
client := testcapnp.Echoer{Client: c.Bootstrap(ctx)}
echo := client.Echo(ctx, func(p testcapnp.Echoer_echo_Params) error {
return p.SetCap(testcapnp.CallOrder{Client: client.Client})
})
pipeline := echo.Cap()
call0 := callseq(ctx, pipeline.Client, 0)
call1 := callseq(ctx, pipeline.Client, 1)
close(delay)
check := func(promise testcapnp.CallOrder_getCallSequence_Results_Promise, n uint32) {
r, err := promise.Struct()
if err != nil {
t.Errorf("call%d error: %v", n, err)
}
if r.N() != n {
t.Errorf("call%d = %d; want %d", n, r.N(), n)
}
}
check(call0, 0)
check(call1, 1)
}
type DelayEchoer struct {
Echoer
delay chan struct{}
}
func (de *DelayEchoer) Echo(call testcapnp.Echoer_echo) error {
server.Ack(call.Options)
<-de.delay
return de.Echoer.Echo(call)
}

View File

@@ -1,145 +0,0 @@
package rpc_test
import (
"sync"
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
"zombiezen.com/go/capnproto2/rpc/internal/testcapnp"
"zombiezen.com/go/capnproto2/server"
)
func TestRelease(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
log := testLogger{t}
c := rpc.NewConn(p, rpc.ConnLog(log))
hf := new(HandleFactory)
d := rpc.NewConn(q, rpc.MainInterface(testcapnp.HandleFactory_ServerToClient(hf).Client), rpc.ConnLog(log))
defer d.Wait()
defer c.Close()
client := testcapnp.HandleFactory{Client: c.Bootstrap(ctx)}
r, err := client.NewHandle(ctx, nil).Struct()
if err != nil {
t.Fatal("NewHandle:", err)
}
handle := r.Handle()
if n := hf.numHandles(); n != 1 {
t.Fatalf("numHandles = %d; want 1", n)
}
if err := handle.Client.Close(); err != nil {
t.Error("handle.Client.Close():", err)
}
flushConn(ctx, c)
if n := hf.numHandles(); n != 0 {
t.Errorf("numHandles = %d; want 0", n)
}
}
func TestReleaseAlias(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
log := testLogger{t}
c := rpc.NewConn(p, rpc.ConnLog(log))
hf := singletonHandleFactory()
d := rpc.NewConn(q, rpc.MainInterface(testcapnp.HandleFactory_ServerToClient(hf).Client), rpc.ConnLog(log))
defer d.Wait()
defer c.Close()
client := testcapnp.HandleFactory{Client: c.Bootstrap(ctx)}
r1, err := client.NewHandle(ctx, nil).Struct()
if err != nil {
t.Fatal("NewHandle #1:", err)
}
handle1 := r1.Handle()
r2, err := client.NewHandle(ctx, nil).Struct()
if err != nil {
t.Fatal("NewHandle #2:", err)
}
handle2 := r2.Handle()
if n := hf.numHandles(); n != 1 {
t.Fatalf("after creation, numHandles = %d; want 1", n)
}
if err := handle1.Client.Close(); err != nil {
t.Error("handle1.Client.Close():", err)
}
flushConn(ctx, c)
if n := hf.numHandles(); n != 1 {
t.Errorf("after handle1.Client.Close(), numHandles = %d; want 1", n)
}
if err := handle2.Client.Close(); err != nil {
t.Error("handle2.Client.Close():", err)
}
flushConn(ctx, c)
if n := hf.numHandles(); n != 0 {
t.Errorf("after handle1.Close() and handle2.Close(), numHandles = %d; want 0", n)
}
}
func flushConn(ctx context.Context, c *rpc.Conn) {
// discard result
c.Bootstrap(ctx).Call(&capnp.Call{
Ctx: ctx,
Method: capnp.Method{InterfaceID: 0xdeadbeef, MethodID: 42},
}).Struct()
}
type Handle struct {
f *HandleFactory
}
func (h Handle) Close() error {
h.f.mu.Lock()
h.f.n--
h.f.mu.Unlock()
return nil
}
type HandleFactory struct {
n int
mu sync.Mutex
singleton testcapnp.Handle
}
func singletonHandleFactory() *HandleFactory {
hf := new(HandleFactory)
hf.singleton = testcapnp.Handle_ServerToClient(&Handle{f: hf})
return hf
}
func (hf *HandleFactory) NewHandle(call testcapnp.HandleFactory_newHandle) error {
server.Ack(call.Options)
if hf.singleton.Client == nil {
hf.mu.Lock()
hf.n++
hf.mu.Unlock()
call.Results.SetHandle(testcapnp.Handle_ServerToClient(&Handle{f: hf}))
} else {
hf.mu.Lock()
hf.n = 1
hf.mu.Unlock()
call.Results.SetHandle(hf.singleton)
}
return nil
}
func (hf *HandleFactory) numHandles() int {
hf.mu.Lock()
n := hf.n
hf.mu.Unlock()
return n
}

View File

@@ -1,629 +0,0 @@
package rpc_test
import (
"errors"
"flag"
"fmt"
"testing"
"golang.org/x/net/context"
"zombiezen.com/go/capnproto2"
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/rpc/internal/logtransport"
"zombiezen.com/go/capnproto2/rpc/internal/pipetransport"
rpccapnp "zombiezen.com/go/capnproto2/std/capnp/rpc"
)
const (
interfaceID uint64 = 0xa7317bd7216570aa
methodID uint16 = 9
bootstrapExportID uint32 = 84
)
var logMessages = flag.Bool("logmessages", false, "whether to log the transport in tests. Messages are always from client to server.")
type testLogger struct {
t interface {
Logf(format string, args ...interface{})
}
}
func (l testLogger) Infof(ctx context.Context, format string, args ...interface{}) {
l.t.Logf("conn log: "+format, args...)
}
func (l testLogger) Errorf(ctx context.Context, format string, args ...interface{}) {
l.t.Logf("conn log: "+format, args...)
}
func newUnpairedConn(t *testing.T, options ...rpc.ConnOption) (*rpc.Conn, rpc.Transport) {
p, q := pipetransport.New()
if *logMessages {
p = logtransport.New(nil, p)
}
newopts := make([]rpc.ConnOption, len(options), len(options)+1)
copy(newopts, options)
newopts = append(newopts, rpc.ConnLog(testLogger{t}))
c := rpc.NewConn(p, newopts...)
return c, q
}
func TestBootstrap(t *testing.T) {
ctx := context.Background()
conn, p := newUnpairedConn(t)
defer conn.Close()
defer p.Close()
clientCtx, cancel := context.WithCancel(ctx)
defer cancel()
readBootstrap(t, clientCtx, conn, p)
}
func readBootstrap(t *testing.T, ctx context.Context, conn *rpc.Conn, p rpc.Transport) (client capnp.Client, questionID uint32) {
clientCh := make(chan capnp.Client, 1)
go func() {
clientCh <- conn.Bootstrap(ctx)
}()
msg, err := p.RecvMessage(ctx)
if err != nil {
t.Fatal("Read Bootstrap failed:", err)
}
if msg.Which() != rpccapnp.Message_Which_bootstrap {
t.Fatalf("Received %v message from bootstrap, want Message_Which_bootstrap", msg.Which())
}
boot, err := msg.Bootstrap()
if err != nil {
t.Fatal("Read Bootstrap failed:", err)
}
questionID = boot.QuestionId()
// If this deadlocks, then Bootstrap isn't using a promised client.
client = <-clientCh
if client == nil {
t.Fatal("Bootstrap client is nil")
}
return
}
func TestBootstrapFulfilledSenderHosted(t *testing.T) {
testBootstrapFulfilled(t, false)
}
func TestBootstrapFulfilledSenderPromise(t *testing.T) {
testBootstrapFulfilled(t, true)
}
func testBootstrapFulfilled(t *testing.T, resultIsPromise bool) {
ctx := context.Background()
conn, p := newUnpairedConn(t)
defer conn.Close()
defer p.Close()
clientCtx, cancel := context.WithCancel(ctx)
defer cancel()
bootstrapAndFulfill(t, clientCtx, conn, p, resultIsPromise)
}
// Receive a Finish message for the given question ID.
//
// Immediately releases any capabilities in the message.
//
// An error is returned if any of the following occur:
//
// * An error occurs when reading the message
// * The message is not of type `Finish`
// * The message's question ID is not equal to `questionID`.
//
// Parameters:
//
// ctx: The context to be used when sending the message.
// p: The rpc.Transport to send the message on.
// questionID: The expected question ID.
func recvFinish(ctx context.Context, p rpc.Transport, questionID uint32) error {
if finish, err := p.RecvMessage(ctx); err != nil {
return err
} else if finish.Which() != rpccapnp.Message_Which_finish {
return fmt.Errorf("message sent is %v; want Message_Which_finish", finish.Which())
} else {
f, err := finish.Finish()
if err != nil {
return err
}
if id := f.QuestionId(); id != questionID {
return fmt.Errorf("finish question ID is %d; want %d", id, questionID)
}
if f.ReleaseResultCaps() {
return fmt.Errorf("finish released bootstrap capability")
}
}
return nil
}
// Send a Return message with a single capability to a bootstrap interface in
// its payload. Returns any error that occurs.
//
// Parameters:
//
// ctx: The context to be used when sending the message.
// p: The rpc.Transport to send the message on.
// answerId: The message's answerId.
// isPromise: If this is true, the capability in the response will be of type
// senderPromise, otherwise it will be of type senderHosted.
func sendBootstrapReturn(ctx context.Context, p rpc.Transport, answerId uint32, isPromise bool) error {
return sendMessage(ctx, p, func(msg rpccapnp.Message) error {
ret, err := msg.NewReturn()
if err != nil {
return err
}
ret.SetAnswerId(answerId)
payload, err := ret.NewResults()
if err != nil {
return err
}
payload.SetContent(capnp.NewInterface(msg.Segment(), 0))
capTable, err := payload.NewCapTable(1)
if err != nil {
return err
}
if isPromise {
capTable.At(0).SetSenderPromise(bootstrapExportID)
} else {
capTable.At(0).SetSenderHosted(bootstrapExportID)
}
return nil
})
}
func bootstrapAndFulfill(t *testing.T, ctx context.Context, conn *rpc.Conn, p rpc.Transport, resultIsPromise bool) capnp.Client {
client, bootstrapID := readBootstrap(t, ctx, conn, p)
if err := sendBootstrapReturn(ctx, p, bootstrapID, resultIsPromise); err != nil {
t.Fatalf("sendBootstrapReturn: %v", err)
}
if err := recvFinish(ctx, p, bootstrapID); err != nil {
t.Fatalf("recvFinish: %v", err)
}
return client
}
func TestCallOnPromisedAnswer(t *testing.T) {
ctx := context.Background()
conn, p := newUnpairedConn(t)
defer conn.Close()
defer p.Close()
client, bootstrapID := readBootstrap(t, ctx, conn, p)
readDone := startRecvMessage(p)
client.Call(&capnp.Call{
Ctx: ctx,
Method: capnp.Method{
InterfaceID: interfaceID,
MethodID: methodID,
},
ParamsSize: capnp.ObjectSize{DataSize: 8},
ParamsFunc: func(s capnp.Struct) error {
s.SetUint64(0, 42)
return nil
},
})
read := <-readDone
if read.err != nil {
t.Fatal("Reading failed:", read.err)
}
if read.msg.Which() != rpccapnp.Message_Which_call {
t.Fatalf("Conn sent %v message, want Message_Which_call", read.msg.Which())
}
call, err := read.msg.Call()
if err != nil {
t.Fatal("call error:", err)
}
if target, err := call.Target(); err != nil {
t.Fatal(err)
} else if target.Which() == rpccapnp.MessageTarget_Which_promisedAnswer {
if pa, err := target.PromisedAnswer(); err != nil {
t.Error("call.target.promisedAnswer error:", err)
} else {
if qid := pa.QuestionId(); qid != bootstrapID {
t.Errorf("Target question ID = %d; want %d", qid, bootstrapID)
}
// TODO(light): allow no-ops
if xform, err := pa.Transform(); err != nil {
t.Error("call.target.promisedAnswer.transform error:", err)
} else if xform.Len() != 0 {
t.Error("Target transform is non-empty")
}
}
} else {
t.Errorf("Target is %v, want MessageTarget_Which_promisedAnswer", target.Which())
}
if id := call.InterfaceId(); id != interfaceID {
t.Errorf("Interface ID = %x; want %x", id, interfaceID)
}
if id := call.MethodId(); id != methodID {
t.Errorf("Method ID = %d; want %d", id, methodID)
}
if params, err := call.Params(); err != nil {
t.Error("call.params error:", err)
} else {
if content, err := params.Content(); err != nil {
t.Error("call.params.content error:", err)
} else if x := capnp.ToStruct(content).Uint64(0); x != 42 {
t.Errorf("Params content value = %d; want %d", x, 42)
}
}
sendResultsTo := call.SendResultsTo()
if sendResultsTo.Which() != rpccapnp.Call_sendResultsTo_Which_caller {
t.Errorf("Send results to %v; want caller", sendResultsTo.Which())
}
}
func TestCallOnExportId_BootstrapIsPromise(t *testing.T) {
testCallOnExportId(t, true)
}
func TestCallOnExportId_BootstrapIsHosted(t *testing.T) {
testCallOnExportId(t, false)
}
func testCallOnExportId(t *testing.T, bootstrapIsPromise bool) {
ctx := context.Background()
conn, p := newUnpairedConn(t)
defer conn.Close()
defer p.Close()
client := bootstrapAndFulfill(t, ctx, conn, p, bootstrapIsPromise)
readDone := startRecvMessage(p)
client.Call(&capnp.Call{
Ctx: ctx,
Method: capnp.Method{
InterfaceID: interfaceID,
MethodID: methodID,
},
ParamsSize: capnp.ObjectSize{DataSize: 8},
ParamsFunc: func(s capnp.Struct) error {
s.SetUint64(0, 42)
return nil
},
})
read := <-readDone
if read.err != nil {
t.Fatal("Reading failed:", read.err)
}
call, err := read.msg.Call()
if err != nil {
t.Fatal("call error:", err)
}
if read.msg.Which() != rpccapnp.Message_Which_call {
t.Fatalf("Conn sent %v message, want Message_Which_call", read.msg.Which())
}
if target, err := call.Target(); err != nil {
t.Error("call.target error:", err)
} else if target.Which() != rpccapnp.MessageTarget_Which_importedCap {
t.Errorf("Target is %v, want MessageTarget_Which_importedCap", target.Which())
} else {
if id := target.ImportedCap(); id != bootstrapExportID {
t.Errorf("Target imported cap = %d; want %d", id, bootstrapExportID)
}
}
if id := call.InterfaceId(); id != interfaceID {
t.Errorf("Interface ID = %x; want %x", id, interfaceID)
}
if id := call.MethodId(); id != methodID {
t.Errorf("Method ID = %d; want %d", id, methodID)
}
if params, err := call.Params(); err != nil {
t.Error("call.params error:", err)
} else if content, err := params.Content(); err != nil {
t.Error("call.params.content error:", err)
} else if x := capnp.ToStruct(content).Uint64(0); x != 42 {
t.Errorf("Params content value = %d; want %d", x, 42)
}
if sendResultsTo := call.SendResultsTo(); err != nil {
t.Error("call.sendResultsTo error:", err)
} else if sendResultsTo.Which() != rpccapnp.Call_sendResultsTo_Which_caller {
t.Errorf("Send results to %v; want caller", sendResultsTo.Which())
}
}
func TestMainInterface(t *testing.T) {
main := mockClient()
conn, p := newUnpairedConn(t, rpc.MainInterface(main))
defer conn.Close()
defer p.Close()
bootstrapRoundtrip(t, p)
}
func bootstrapRoundtrip(t *testing.T, p rpc.Transport) (importID, questionID uint32) {
questionID = 54
err := sendMessage(context.TODO(), p, func(msg rpccapnp.Message) error {
bootstrap, err := msg.NewBootstrap()
if err != nil {
return err
}
bootstrap.SetQuestionId(questionID)
return nil
})
if err != nil {
t.Fatal("Write Bootstrap failed:", err)
}
msg, err := p.RecvMessage(context.TODO())
if err != nil {
t.Fatal("Read Bootstrap response failed:", err)
}
if msg.Which() != rpccapnp.Message_Which_return {
t.Fatalf("Conn sent %v message, want Message_Which_return", msg.Which())
}
ret, err := msg.Return()
if err != nil {
t.Fatal("return error:", err)
}
if id := ret.AnswerId(); id != questionID {
t.Fatalf("msg.Return().AnswerId() = %d; want %d", id, questionID)
}
if ret.Which() != rpccapnp.Return_Which_results {
t.Fatalf("msg.Return().Which() = %v; want Return_Which_results", ret.Which())
}
payload, err := ret.Results()
if err != nil {
t.Fatal("return.results error:", err)
}
content, err := payload.ContentPtr()
if err != nil {
t.Fatal("return.results.content error:", err)
}
in := content.Interface()
if !in.IsValid() {
t.Fatalf("Result payload contains %v; want interface", content)
}
capIdx := int(in.Capability())
capTable, err := payload.CapTable()
if err != nil {
t.Fatal("return.results.capTable error:", err)
}
if n := capTable.Len(); capIdx >= n {
t.Fatalf("Payload capTable has size %d, but capability index = %d", n, capIdx)
}
if cw := capTable.At(capIdx).Which(); cw != rpccapnp.CapDescriptor_Which_senderHosted {
t.Fatalf("Capability type is %d; want CapDescriptor_Which_senderHosted", cw)
}
return capTable.At(capIdx).SenderHosted(), questionID
}
func TestReceiveCallOnPromisedAnswer(t *testing.T) {
const questionID = 999
called := false
main := stubClient(func(ctx context.Context, params capnp.Struct) (capnp.Struct, error) {
msg, s, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
return capnp.Struct{}, err
}
result, err := capnp.NewStruct(s, capnp.ObjectSize{})
if err != nil {
return capnp.Struct{}, err
}
called = true
if err := msg.SetRoot(result); err != nil {
return capnp.Struct{}, err
}
return result, nil
})
conn, p := newUnpairedConn(t, rpc.MainInterface(main))
defer conn.Close()
defer p.Close()
_, bootqID := bootstrapRoundtrip(t, p)
err := sendMessage(context.TODO(), p, func(msg rpccapnp.Message) error {
call, err := msg.NewCall()
if err != nil {
return err
}
call.SetQuestionId(questionID)
call.SetInterfaceId(interfaceID)
call.SetMethodId(methodID)
target, err := call.NewTarget()
if err != nil {
return err
}
pa, err := target.NewPromisedAnswer()
if err != nil {
return err
}
pa.SetQuestionId(bootqID)
payload, err := call.NewParams()
if err != nil {
return err
}
content, err := capnp.NewStruct(msg.Segment(), capnp.ObjectSize{})
if err != nil {
return err
}
payload.SetContent(content)
return nil
})
if err != nil {
t.Fatal("Call message failed:", err)
}
retmsg, err := p.RecvMessage(context.TODO())
if err != nil {
t.Fatal("Read Call return failed:", err)
}
if !called {
t.Error("interface not called")
}
if retmsg.Which() != rpccapnp.Message_Which_return {
t.Fatalf("Return message is %v; want %v", retmsg.Which(), rpccapnp.Message_Which_return)
}
ret, err := retmsg.Return()
if err != nil {
t.Fatal("return error:", err)
}
if id := ret.AnswerId(); id != questionID {
t.Errorf("Return.answerId = %d; want %d", id, questionID)
}
if ret.Which() == rpccapnp.Return_Which_results {
// TODO(light)
} else if ret.Which() == rpccapnp.Return_Which_exception {
exc, _ := ret.Exception()
reason, _ := exc.Reason()
t.Error("Return.exception:", reason)
} else {
t.Errorf("Return.Which() = %v; want %v", ret.Which(), rpccapnp.Return_Which_results)
}
}
func TestReceiveCallOnExport(t *testing.T) {
const questionID = 999
called := false
main := stubClient(func(ctx context.Context, params capnp.Struct) (capnp.Struct, error) {
msg, s, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
return capnp.Struct{}, err
}
result, err := capnp.NewStruct(s, capnp.ObjectSize{})
if err != nil {
return capnp.Struct{}, err
}
called = true
if err := msg.SetRoot(result); err != nil {
return capnp.Struct{}, err
}
return result, nil
})
conn, p := newUnpairedConn(t, rpc.MainInterface(main))
defer conn.Close()
defer p.Close()
importID := sendBootstrapAndFinish(t, p)
err := sendMessage(context.TODO(), p, func(msg rpccapnp.Message) error {
call, err := msg.NewCall()
if err != nil {
return err
}
call.SetQuestionId(questionID)
call.SetInterfaceId(interfaceID)
call.SetMethodId(methodID)
target, err := call.NewTarget()
if err != nil {
return err
}
target.SetImportedCap(importID)
call.SetTarget(target)
payload, err := call.NewParams()
if err != nil {
return err
}
content, err := capnp.NewStruct(msg.Segment(), capnp.ObjectSize{})
if err != nil {
return err
}
payload.SetContent(content)
return nil
})
if err != nil {
t.Fatal("Call message failed:", err)
}
retmsg, err := p.RecvMessage(context.TODO())
if err != nil {
t.Fatal("Read Call return failed:", err)
}
if !called {
t.Error("interface not called")
}
if retmsg.Which() != rpccapnp.Message_Which_return {
t.Fatalf("Return message is %v; want %v", retmsg.Which(), rpccapnp.Message_Which_return)
}
ret, err := retmsg.Return()
if err != nil {
t.Fatal("return error:", err)
}
if id := ret.AnswerId(); id != questionID {
t.Errorf("Return.answerId = %d; want %d", id, questionID)
}
if ret.Which() == rpccapnp.Return_Which_results {
// TODO(light)
} else if ret.Which() == rpccapnp.Return_Which_exception {
exc, _ := ret.Exception()
reason, _ := exc.Reason()
t.Error("Return.exception:", reason)
} else {
t.Errorf("Return.Which() = %v; want %v", ret.Which(), rpccapnp.Return_Which_results)
}
}
func sendBootstrapAndFinish(t *testing.T, p rpc.Transport) (importID uint32) {
importID, questionID := bootstrapRoundtrip(t, p)
err := sendMessage(context.TODO(), p, func(msg rpccapnp.Message) error {
finish, err := msg.NewFinish()
if err != nil {
return err
}
finish.SetQuestionId(questionID)
finish.SetReleaseResultCaps(false)
return nil
})
if err != nil {
t.Fatal("Write Bootstrap Finish failed:", err)
}
return importID
}
func sendMessage(ctx context.Context, t rpc.Transport, f func(rpccapnp.Message) error) error {
_, s, err := capnp.NewMessage(capnp.SingleSegment(nil))
m, err := rpccapnp.NewRootMessage(s)
if err != nil {
return err
}
if err := f(m); err != nil {
return err
}
return t.SendMessage(ctx, m)
}
func startRecvMessage(t rpc.Transport) <-chan asyncRecv {
ch := make(chan asyncRecv, 1)
go func() {
msg, err := t.RecvMessage(context.TODO())
ch <- asyncRecv{msg, err}
}()
return ch
}
type asyncRecv struct {
msg rpccapnp.Message
err error
}
func mockClient() capnp.Client {
return capnp.ErrorClient(errMockClient)
}
type stubClient func(ctx context.Context, params capnp.Struct) (capnp.Struct, error)
func (stub stubClient) Call(call *capnp.Call) capnp.Answer {
if call.Method.InterfaceID != interfaceID || call.Method.MethodID != methodID {
return capnp.ErrorAnswer(errNotImplemented)
}
cc, err := call.PlaceParams(nil)
if err != nil {
return capnp.ErrorAnswer(err)
}
s, err := stub(call.Ctx, cc)
if err != nil {
return capnp.ErrorAnswer(err)
}
return capnp.ImmediateAnswer(s)
}
func (stub stubClient) Close() error {
return nil
}
var (
errMockClient = errors.New("rpc_test: mock client")
errNotImplemented = errors.New("rpc_test: stub client method not implemented")
)