infra
/
goutils
Archived
1
0
Fork 0

atobuf추가 / queue코드 정리

This commit is contained in:
Sangbum Kim 2018-06-09 12:31:21 +09:00
parent a3b94e2f5e
commit 302b7a3ded
6 changed files with 256 additions and 82 deletions

143
buf/atomic_buf.go Normal file
View File

@ -0,0 +1,143 @@
package buf
import (
"bytes"
"sync"
"io"
)
type AtomicBuffers interface {
Bytes() []byte
Cap() int
Grow(int)
Len() int
Next(int) []byte
ReadByte() (byte, error)
ReadBytes(byte) ([]byte, error)
ReadFrom(io.Reader) (int64, error)
ReadRune() (rune, int, error)
ReadString(byte) (string, error)
Reset()
Truncate(int)
UnreadByte() error
UnreadRune() error
WriteByte(byte) error
WriteRune(rune) (int, error)
WriteString(string) (int, error)
WriteTo(io.Writer) (int64, error)
Read([]byte) (int, error)
Write([]byte) (int, error)
String() string
}
type atomicBuffer struct {
b bytes.Buffer
m sync.RWMutex
}
func (b *atomicBuffer) Bytes() []byte {
b.m.RLock()
defer b.m.RUnlock()
return b.b.Bytes()
}
func (b *atomicBuffer) Cap() int {
b.m.RLock()
defer b.m.RUnlock()
return b.b.Cap()
}
func (b *atomicBuffer) Grow(n int) {
b.m.Lock()
defer b.m.Unlock()
b.b.Grow(n)
}
func (b *atomicBuffer) Len() int {
b.m.RLock()
defer b.m.RUnlock()
return b.b.Len()
}
func (b *atomicBuffer) Next(n int) []byte {
b.m.Lock()
defer b.m.Unlock()
return b.b.Next(n)
}
func (b *atomicBuffer) ReadByte() (c byte, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.ReadByte()
}
func (b *atomicBuffer) ReadBytes(delim byte) (line []byte, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.ReadBytes(delim)
}
func (b *atomicBuffer) ReadFrom(r io.Reader) (n int64, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.ReadFrom(r)
}
func (b *atomicBuffer) ReadRune() (r rune, size int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.ReadRune()
}
func (b *atomicBuffer) ReadString(delim byte) (line string, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.ReadString(delim)
}
func (b *atomicBuffer) Reset() {
b.m.Lock()
defer b.m.Unlock()
b.b.Reset()
}
func (b *atomicBuffer) Truncate(n int) {
b.m.Lock()
defer b.m.Unlock()
b.b.Truncate(n)
}
func (b *atomicBuffer) UnreadByte() error {
b.m.Lock()
defer b.m.Unlock()
return b.b.UnreadByte()
}
func (b *atomicBuffer) UnreadRune() error {
b.m.Lock()
defer b.m.Unlock()
return b.b.UnreadRune()
}
func (b *atomicBuffer) WriteByte(c byte) error {
b.m.Lock()
defer b.m.Unlock()
return b.b.WriteByte(c)
}
func (b *atomicBuffer) WriteRune(r rune) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.WriteRune(r)
}
func (b *atomicBuffer) WriteString(s string) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.WriteString(s)
}
func (b *atomicBuffer) WriteTo(w io.Writer) (n int64, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.WriteTo(w)
}
func (b *atomicBuffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}
func (b *atomicBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
func (b *atomicBuffer) String() string {
b.m.RLock()
defer b.m.RUnlock()
return b.b.String()
}

28
buf/atomic_buf_pool.go Normal file
View File

@ -0,0 +1,28 @@
package buf
import (
"sync"
)
// atomicBuffer pool to reduce GC
var atoBuffers = sync.Pool{
// New is called when a new instance is needed
New: func() interface{} {
return new(atomicBuffer)
},
}
// GetBuffer fetches a atomicBuffer from the pool
func GetAtoBuffer() AtomicBuffers {
return atoBuffers.Get().(AtomicBuffers)
}
// PutBuffer returns a atomicBuffer to the pool
func PutAtoBuffer(buf AtomicBuffers) {
bufPtr, ok := buf.(*atomicBuffer)
if !ok {
return
}
bufPtr.Reset()
atoBuffers.Put(bufPtr)
}

74
buf/bytebuf.go Normal file
View File

@ -0,0 +1,74 @@
package buf
import (
"unicode/utf8"
"unsafe"
)
// ByteBuffer provides byte buffer, which can be used with fasthttp API
// in order to minimize memory allocations.
//
// ByteBuffer may be used with functions appending data to the given []byte
// slice. See example code for details.
//
// Use AcquireByteBuffer for obtaining an empty byte buffer.
type ByteBuffer struct {
// B is a byte buffer to use in append-like workloads.
// See example code for details.
B []byte
}
func (b *ByteBuffer) WriteByte(c byte) error {
b.B = append(b.B, c)
return nil
}
// Write implements io.Writer - it appends p to ByteBuffer.B
func (b *ByteBuffer) Write(p []byte) (int, error) {
b.B = append(b.B, p...)
return len(p), nil
}
// WriteString appends s to ByteBuffer.B
func (b *ByteBuffer) WriteString(s string) (int, error) {
b.B = append(b.B, s...)
return len(s), nil
}
//(r rune) (n int, err error) {
// WriteString appends s to ByteBuffer.B
func (b *ByteBuffer) WriteRune(r rune) (n int, err error) {
if r < utf8.RuneSelf {
b.B = append(b.B, byte(r))
return 1, nil
}
curSize := len(b.B)
runCharBuf := b.B[curSize:curSize+utf8.UTFMax]
n = utf8.EncodeRune(runCharBuf, r)
b.B = b.B[:curSize+n]
return n, nil
}
// Set sets ByteBuffer.B to p
func (b *ByteBuffer) Set(p []byte) {
b.B = append(b.B[:0], p...)
}
// SetString sets ByteBuffer.B to s
func (b *ByteBuffer) SetString(s string) {
b.B = append(b.B[:0], s...)
}
// Reset makes ByteBuffer.B empty.
func (b *ByteBuffer) Reset() {
b.B = b.B[:0]
}
// String returns the accumulated string.
func (b *ByteBuffer) String() string {
return *(*string)(unsafe.Pointer(&b.B))
}
// Len returns the number of accumulated bytes; b.Len() == len(b.String()).
func (b *ByteBuffer) Len() int { return len(b.B) }

View File

@ -2,72 +2,12 @@ package buf
import ( import (
"sync" "sync"
"unicode/utf8"
) )
const ( const (
defaultByteBufferSize = 4096 defaultByteBufferSize = 4096
) )
// ByteBuffer provides byte buffer, which can be used with fasthttp API
// in order to minimize memory allocations.
//
// ByteBuffer may be used with functions appending data to the given []byte
// slice. See example code for details.
//
// Use AcquireByteBuffer for obtaining an empty byte buffer.
type ByteBuffer struct {
// B is a byte buffer to use in append-like workloads.
// See example code for details.
B []byte
}
func (b *ByteBuffer) WriteByte(c byte) error {
b.B = append(b.B, c)
return nil
}
// Write implements io.Writer - it appends p to ByteBuffer.B
func (b *ByteBuffer) Write(p []byte) (int, error) {
b.B = append(b.B, p...)
return len(p), nil
}
// WriteString appends s to ByteBuffer.B
func (b *ByteBuffer) WriteString(s string) (int, error) {
b.B = append(b.B, s...)
return len(s), nil
}
//(r rune) (n int, err error) {
// WriteString appends s to ByteBuffer.B
func (b *ByteBuffer) WriteRune(r rune) (n int, err error) {
if r < utf8.RuneSelf {
b.B = append(b.B, byte(r))
return 1, nil
}
curSize := len(b.B)
runCharBuf := b.B[curSize:curSize+utf8.UTFMax]
n = utf8.EncodeRune(runCharBuf, r)
b.B = b.B[:curSize+n]
return n, nil
}
// Set sets ByteBuffer.B to p
func (b *ByteBuffer) Set(p []byte) {
b.B = append(b.B[:0], p...)
}
// SetString sets ByteBuffer.B to s
func (b *ByteBuffer) SetString(s string) {
b.B = append(b.B[:0], s...)
}
// Reset makes ByteBuffer.B empty.
func (b *ByteBuffer) Reset() {
b.B = b.B[:0]
}
type ByteBufferPool struct { type ByteBufferPool struct {
pool sync.Pool pool sync.Pool
} }

View File

@ -4,6 +4,7 @@ import (
"container/list" "container/list"
) )
// 블럭되지 않는 큐체널
func NewBytesQueue() (chan<- []byte, <-chan []byte) { func NewBytesQueue() (chan<- []byte, <-chan []byte) {
send := make(chan []byte, 1) send := make(chan []byte, 1)
receive := make(chan []byte, 1) receive := make(chan []byte, 1)
@ -13,18 +14,14 @@ func NewBytesQueue() (chan<- []byte, <-chan []byte) {
func manageBytesQueue(send <-chan []byte, receive chan<- []byte) { func manageBytesQueue(send <-chan []byte, receive chan<- []byte) {
queue := list.New() queue := list.New()
defer close(receive)
for { for {
if front := queue.Front(); front == nil { if front := queue.Front(); front == nil {
if send == nil { if value, ok := <-send; ok {
close(receive)
return
}
value, ok := <-send
if !ok {
close(receive)
return
}
queue.PushBack(value) queue.PushBack(value)
} else {
break
}
} else { } else {
select { select {
case receive <- front.Value.([]byte): case receive <- front.Value.([]byte):
@ -32,8 +29,6 @@ func manageBytesQueue(send <-chan []byte, receive chan<- []byte) {
case value, ok := <-send: case value, ok := <-send:
if ok { if ok {
queue.PushBack(value) queue.PushBack(value)
} else {
send = nil
} }
} }
} }

View File

@ -14,18 +14,14 @@ func NewStringQueue() (chan<- string, <-chan string) {
func manageStringQueue(send <-chan string, receive chan<- string) { func manageStringQueue(send <-chan string, receive chan<- string) {
queue := list.New() queue := list.New()
defer close(receive)
for { for {
if front := queue.Front(); front == nil { if front := queue.Front(); front == nil {
if send == nil { if value, ok := <-send; ok {
close(receive)
return
}
value, ok := <-send
if !ok {
close(receive)
return
}
queue.PushBack(value) queue.PushBack(value)
} else {
break
}
} else { } else {
select { select {
case receive <- front.Value.(string): case receive <- front.Value.(string):
@ -33,8 +29,6 @@ func manageStringQueue(send <-chan string, receive chan<- string) {
case value, ok := <-send: case value, ok := <-send:
if ok { if ok {
queue.PushBack(value) queue.PushBack(value)
} else {
send = nil
} }
} }
} }