Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Encoding interface #101

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions encoding/msgpack/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package msgpack

import (
"encoding/binary"
"errors"
"io"
"reflect"
"time"

"github.com/dmcgowan/msgpack"
"github.com/docker/libchan"
"github.com/docker/libchan/encoding"
)

const (
duplexStreamCode = 1
inboundStreamCode = 2
outboundStreamCode = 3
inboundChannelCode = 4
outboundChannelCode = 5
timeCode = 6
)

type cproducer struct {
encoding.ChannelFactory
}

type creceiver struct {
encoding.ChannelReceiver
}

func decodeReferenceID(b []byte) (referenceID uint64, err error) {
if len(b) == 8 {
referenceID = binary.BigEndian.Uint64(b)
} else if len(b) == 4 {
referenceID = uint64(binary.BigEndian.Uint32(b))
} else {
err = errors.New("bad reference id")
}
return
}

func encodeReferenceID(referenceID uint64) []byte {
if referenceID > 0xffffffff {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, referenceID)
return buf
}
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(referenceID))
return buf
}

func (p *cproducer) copySendChannel(send libchan.Sender) (uint64, error) {
recv, copyID, err := p.CreateReceiver()
if err != nil {
return 0, err
}
// Start copying into sender
go func() {
libchan.Copy(send, recv)
send.Close()
}()
return copyID, nil
}

func (p *cproducer) copyReceiveChannel(recv libchan.Receiver) (uint64, error) {
send, copyID, err := p.CreateSender()
if err != nil {
return 0, err
}
// Start copying from receiver
go func() {
libchan.Copy(send, recv)
send.Close()
}()
return copyID, nil
}

func (r *creceiver) decodeStream(b []byte) (io.ReadWriteCloser, error) {
referenceID, err := decodeReferenceID(b)
if err != nil {
return nil, err
}

return r.GetStream(referenceID)
}

func (r *creceiver) decodeReceiver(v reflect.Value, b []byte) error {
referenceID, err := decodeReferenceID(b)
if err != nil {
return err
}

recv, err := r.GetReceiver(referenceID)
if err != nil {
return err
}

v.Set(reflect.ValueOf(recv))

return nil
}

func (r *creceiver) decodeSender(v reflect.Value, b []byte) error {
referenceID, err := decodeReferenceID(b)
if err != nil {
return err
}

send, err := r.GetSender(referenceID)
if err != nil {
return err
}

v.Set(reflect.ValueOf(send))

return nil
}

func (r *creceiver) decodeWStream(v reflect.Value, b []byte) error {
bs, err := r.decodeStream(b)
if err != nil {
return err
}

v.Set(reflect.ValueOf(bs))

return nil
}

func (r *creceiver) decodeRStream(v reflect.Value, b []byte) error {
bs, err := r.decodeStream(b)
if err != nil {
return err
}

v.Set(reflect.ValueOf(bs))

return bs.Close()
}

func entimeCode(t *time.Time) ([]byte, error) {
var b [12]byte
binary.BigEndian.PutUint64(b[0:8], uint64(t.Unix()))
binary.BigEndian.PutUint32(b[8:12], uint32(t.Nanosecond()))
return b[:], nil
}

func detimeCode(v reflect.Value, b []byte) error {
if len(b) != 12 {
return errors.New("Invalid length")
}
t := time.Unix(int64(binary.BigEndian.Uint64(b[0:8])), int64(binary.BigEndian.Uint32(b[8:12])))

if v.Kind() == reflect.Ptr {
v.Set(reflect.ValueOf(&t))
} else {
v.Set(reflect.ValueOf(t))
}

return nil
}

func (p *cproducer) encodeExtended(iv reflect.Value) (i int, b []byte, e error) {
switch v := iv.Interface().(type) {
case libchan.Sender:
copyCh, err := p.copySendChannel(v)
if err != nil {
return 0, nil, err
}
return inboundChannelCode, encodeReferenceID(copyCh), nil
case libchan.Receiver:
copyCh, err := p.copyReceiveChannel(v)
if err != nil {
return 0, nil, err
}
return outboundChannelCode, encodeReferenceID(copyCh), nil

case io.Reader:
// Either ReadWriteCloser, ReadWriter, or ReadCloser
streamCopy, copyID, err := p.CreateStream()
if err != nil {
return 0, nil, err
}
go func() {
io.Copy(streamCopy, v)
streamCopy.Close()
}()
code := outboundStreamCode
if wc, ok := v.(io.WriteCloser); ok {
go func() {
io.Copy(wc, streamCopy)
wc.Close()
}()
code = duplexStreamCode
} else if w, ok := v.(io.Writer); ok {
go func() {
io.Copy(w, streamCopy)
}()
code = duplexStreamCode
}
return code, encodeReferenceID(copyID), nil
case io.Writer:
streamCopy, copyID, err := p.CreateStream()
if err != nil {
return 0, nil, err
}
if wc, ok := v.(io.WriteCloser); ok {
go func() {
io.Copy(wc, streamCopy)
wc.Close()
}()
} else {
go func() {
io.Copy(v, streamCopy)
}()
}
return inboundStreamCode, encodeReferenceID(copyID), nil
case *time.Time:
b, err := entimeCode(v)
return timeCode, b, err
}
return 0, nil, nil
}

// Codec implements the libchan encoding using msgpack5.
type Codec struct{}

// NewEncoder returns a libchan encoder which encodes given objects
// to msgpack5 on the given datastream using the given encoding
// channel producer.
func (codec *Codec) NewEncoder(w io.Writer, p encoding.ChannelFactory) encoding.Encoder {
prd := &cproducer{p}
encoder := msgpack.NewEncoder(w)
exts := msgpack.NewExtensions()
exts.SetEncoder(prd.encodeExtended)
encoder.AddExtensions(exts)
return encoder
}

// NewDecoder returns a libchan decoder which decodes objects from
// the given data stream from msgpack5 into provided object using
// the provided types for libchan interfaces.
func (codec *Codec) NewDecoder(r io.Reader, recv encoding.ChannelReceiver, streamT, recvT, sendT reflect.Type) encoding.Decoder {
rec := &creceiver{recv}
decoder := msgpack.NewDecoder(r)
exts := msgpack.NewExtensions()
exts.AddDecoder(duplexStreamCode, streamT, rec.decodeWStream)
exts.AddDecoder(inboundStreamCode, streamT, rec.decodeWStream)
exts.AddDecoder(outboundStreamCode, streamT, rec.decodeRStream)
exts.AddDecoder(inboundChannelCode, sendT, rec.decodeSender)
exts.AddDecoder(outboundChannelCode, recvT, rec.decodeReceiver)
exts.AddDecoder(timeCode, reflect.TypeOf(&time.Time{}), detimeCode)
decoder.AddExtensions(exts)
return decoder
}

// NewRawMessage returns a transit object which will copy a
// msgpack5 datastream and allow decoding that object
// using a Decoder from the codec object.
func (codec *Codec) NewRawMessage() encoding.Decoder {
return new(msgpack.RawMessage)
}
78 changes: 78 additions & 0 deletions encoding/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package encoding

import (
"io"
"reflect"

"github.com/docker/libchan"
)

// ChannelFactory represents an object which is able to create new
// channels and streams. This interface is used by an encoder
// create a channel or stream, copy the encoded type, and
// encode the identifier.
type ChannelFactory interface {
// CreateSender creates a new send channel and returns
// the identifier associated with the sender. This
// identifier can be used to get the Receiver on
// the receiving side by calling GetReceiver.
CreateSender() (libchan.Sender, uint64, error)

// CreateReceiver creates a new receive channel and
// returns the identifier associated with the receiver.
// This identifier can be used to get the Sender on
// the receiving side by calling GetSender.
CreateReceiver() (libchan.Receiver, uint64, error)

// CreateStream createsa new byte stream and returns
// the identifier associate with the stream. This
// identifier can be used to get the byte stream
// by calling GetStream on the receiving side.
CreateStream() (io.ReadWriteCloser, uint64, error)
}

// ChannelReceiver represents an object which is able to receive
// new channels and streams and retrieve by an integer identifer.
type ChannelReceiver interface {
// GetSender gets a remotely created sender referenced
// by the given identifier.
GetSender(uint64) (libchan.Sender, error)

// GetReceiver gets a remotely created receiver referenced
// by the given identifier.
GetReceiver(uint64) (libchan.Receiver, error)

// GetStream gets a remotely created byte stream
// referenced by the given identifier.
GetStream(uint64) (io.ReadWriteCloser, error)
}

// Encoder represents an object which can encode an interface
// into data stream to be decoded. This Encoder must be able
// to encode interfaces by converting to libchan channels and
// streams and encoding the identifier.
type Encoder interface {
Encode(v ...interface{}) error
}

// Decoder represents an object which can decode from a data
// stream into an interface. The decoder must have support
// for decoding stream and channel identifiers into a libchan
// Sender or Receiver as well as io Readers and Writers.
type Decoder interface {
Decode(v ...interface{}) error
}

// ChannelCodec represents a libchan codec capable of encoding
// Go interfaces into data streams supporting libchan types as
// well as decode into libchan supported interfaces. In addition
// to encoding and decoding, the codec must provide a transit
// type which is capable of copying a data stream in order to
// delay decoding into an object until finally received.
// The RawMessage must return an object similar to json.RawMessage
// with the capability of decoding itself into an object.
type ChannelCodec interface {
NewEncoder(io.Writer, ChannelFactory) Encoder
NewDecoder(io.Reader, ChannelReceiver, reflect.Type, reflect.Type, reflect.Type) Decoder
NewRawMessage() Decoder
}
3 changes: 2 additions & 1 deletion examples/rexec/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"

"github.com/docker/libchan"
"github.com/docker/libchan/encoding/msgpack"
"github.com/docker/libchan/spdy"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
transport := spdy.NewTransport(p)
transport := spdy.NewTransport(p, &msgpack.Codec{})
sender, err := transport.NewSendChannel()
if err != nil {
log.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion examples/rexec/rexec_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"

"github.com/docker/libchan"
"github.com/docker/libchan/encoding/msgpack"
"github.com/docker/libchan/spdy"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func main() {
log.Print(err)
break
}
t := spdy.NewTransport(p)
t := spdy.NewTransport(p, &msgpack.Codec{})

go func() {
for {
Expand Down
4 changes: 0 additions & 4 deletions spdy/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import (
"github.com/docker/libchan"
)

var (
testPipe = Pipe
)

type SimpleStruct struct {
Value int
}
Expand Down
Loading