Merge pull request #711 from harshavardhana/pr_out_avoid_config_reload_all_the_time_reload_is_manually_triggerred_from_outside

This commit is contained in:
Harshavardhana 2015-07-07 00:29:46 +00:00
commit 5132fd84db
28 changed files with 3271 additions and 104 deletions

12
Godeps/Godeps.json generated
View File

@ -9,6 +9,18 @@
"ImportPath": "github.com/dustin/go-humanize",
"Rev": "8cc1aaa2d955ee82833337cfb10babc42be6bce6"
},
{
"ImportPath": "github.com/facebookgo/clock",
"Rev": "600d898af40aa09a7a93ecb9265d87b0504b6f03"
},
{
"ImportPath": "github.com/facebookgo/httpdown",
"Rev": "3d94c3159d8ba15fa8e9499134ccc0d8acf6adb7"
},
{
"ImportPath": "github.com/facebookgo/stats",
"Rev": "31fb71caf5a4f04c9f8bb3fa8e7c2597ba6eb50a"
},
{
"ImportPath": "github.com/fatih/structs",
"Rev": "c00d27128bb88e9c1adab1a53cda9c72c6d1ff9b"

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Ben Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,104 @@
clock [![Build Status](https://drone.io/github.com/benbjohnson/clock/status.png)](https://drone.io/github.com/benbjohnson/clock/latest) [![Coverage Status](https://coveralls.io/repos/benbjohnson/clock/badge.png?branch=master)](https://coveralls.io/r/benbjohnson/clock?branch=master) [![GoDoc](https://godoc.org/github.com/benbjohnson/clock?status.png)](https://godoc.org/github.com/benbjohnson/clock) ![Project status](http://img.shields.io/status/experimental.png?color=red)
=====
Clock is a small library for mocking time in Go. It provides an interface
around the standard library's [`time`][time] package so that the application
can use the realtime clock while tests can use the mock clock.
[time]: http://golang.org/pkg/time/
## Usage
### Realtime Clock
Your application can maintain a `Clock` variable that will allow realtime and
mock clocks to be interchangable. For example, if you had an `Application` type:
```go
import "github.com/benbjohnson/clock"
type Application struct {
Clock clock.Clock
}
```
You could initialize it to use the realtime clock like this:
```go
var app Application
app.Clock = clock.New()
...
```
Then all timers and time-related functionality should be performed from the
`Clock` variable.
### Mocking time
In your tests, you will want to use a `Mock` clock:
```go
import (
"testing"
"github.com/benbjohnson/clock"
)
func TestApplication_DoSomething(t *testing.T) {
mock := clock.NewMock()
app := Application{Clock: mock}
...
}
```
Now that you've initialized your application to use the mock clock, you can
adjust the time programmatically. The mock clock always starts from the Unix
epoch (midnight, Jan 1, 1970 UTC).
### Controlling time
The mock clock provides the same functions that the standard library's `time`
package provides. For example, to find the current time, you use the `Now()`
function:
```go
mock := clock.NewMock()
// Find the current time.
mock.Now().UTC() // 1970-01-01 00:00:00 +0000 UTC
// Move the clock forward.
mock.Add(2 * time.Hour)
// Check the time again. It's 2 hours later!
mock.Now().UTC() // 1970-01-01 02:00:00 +0000 UTC
```
Timers and Tickers are also controlled by this same mock clock. They will only
execute when the clock is moved forward:
```
mock := clock.NewMock()
count := 0
// Kick off a timer to increment every 1 mock second.
go func() {
ticker := clock.Ticker(1 * time.Second)
for {
<-ticker.C
count++
}
}()
runtime.Gosched()
// Move the clock forward 10 second.
mock.Add(10 * time.Second)
// This prints 10.
fmt.Println(count)
```

View File

@ -0,0 +1,363 @@
package clock
import (
"runtime"
"sort"
"sync"
"time"
)
// Clock represents an interface to the functions in the standard library time
// package. Two implementations are available in the clock package. The first
// is a real-time clock which simply wraps the time package's functions. The
// second is a mock clock which will only make forward progress when
// programmatically adjusted.
type Clock interface {
After(d time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) *Timer
Now() time.Time
Sleep(d time.Duration)
Tick(d time.Duration) <-chan time.Time
Ticker(d time.Duration) *Ticker
Timer(d time.Duration) *Timer
}
// New returns an instance of a real-time clock.
func New() Clock {
return &clock{}
}
// clock implements a real-time clock by simply wrapping the time package functions.
type clock struct{}
func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) }
func (c *clock) AfterFunc(d time.Duration, f func()) *Timer {
return &Timer{timer: time.AfterFunc(d, f)}
}
func (c *clock) Now() time.Time { return time.Now() }
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) }
func (c *clock) Ticker(d time.Duration) *Ticker {
t := time.NewTicker(d)
return &Ticker{C: t.C, ticker: t}
}
func (c *clock) Timer(d time.Duration) *Timer {
t := time.NewTimer(d)
return &Timer{C: t.C, timer: t}
}
// Mock represents a mock clock that only moves forward programmically.
// It can be preferable to a real-time clock when testing time-based functionality.
type Mock struct {
mu sync.Mutex
now time.Time // current time
timers clockTimers // tickers & timers
calls Calls
waiting []waiting
callsMutex sync.Mutex
}
// NewMock returns an instance of a mock clock.
// The current time of the mock clock on initialization is the Unix epoch.
func NewMock() *Mock {
return &Mock{now: time.Unix(0, 0)}
}
// Add moves the current time of the mock clock forward by the duration.
// This should only be called from a single goroutine at a time.
func (m *Mock) Add(d time.Duration) {
// Calculate the final current time.
t := m.now.Add(d)
// Continue to execute timers until there are no more before the new time.
for {
if !m.runNextTimer(t) {
break
}
}
// Ensure that we end with the new time.
m.mu.Lock()
m.now = t
m.mu.Unlock()
// Give a small buffer to make sure the other goroutines get handled.
gosched()
}
// runNextTimer executes the next timer in chronological order and moves the
// current time to the timer's next tick time. The next time is not executed if
// it's next time if after the max time. Returns true if a timer is executed.
func (m *Mock) runNextTimer(max time.Time) bool {
m.mu.Lock()
// Sort timers by time.
sort.Sort(m.timers)
// If we have no more timers then exit.
if len(m.timers) == 0 {
m.mu.Unlock()
return false
}
// Retrieve next timer. Exit if next tick is after new time.
t := m.timers[0]
if t.Next().After(max) {
m.mu.Unlock()
return false
}
// Move "now" forward and unlock clock.
m.now = t.Next()
m.mu.Unlock()
// Execute timer.
t.Tick(m.now)
return true
}
// After waits for the duration to elapse and then sends the current time on the returned channel.
func (m *Mock) After(d time.Duration) <-chan time.Time {
defer m.inc(&m.calls.After)
return m.Timer(d).C
}
// AfterFunc waits for the duration to elapse and then executes a function.
// A Timer is returned that can be stopped.
func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer {
defer m.inc(&m.calls.AfterFunc)
t := m.Timer(d)
t.C = nil
t.fn = f
return t
}
// Now returns the current wall time on the mock clock.
func (m *Mock) Now() time.Time {
defer m.inc(&m.calls.Now)
m.mu.Lock()
defer m.mu.Unlock()
return m.now
}
// Sleep pauses the goroutine for the given duration on the mock clock.
// The clock must be moved forward in a separate goroutine.
func (m *Mock) Sleep(d time.Duration) {
defer m.inc(&m.calls.Sleep)
<-m.After(d)
}
// Tick is a convenience function for Ticker().
// It will return a ticker channel that cannot be stopped.
func (m *Mock) Tick(d time.Duration) <-chan time.Time {
defer m.inc(&m.calls.Tick)
return m.Ticker(d).C
}
// Ticker creates a new instance of Ticker.
func (m *Mock) Ticker(d time.Duration) *Ticker {
defer m.inc(&m.calls.Ticker)
m.mu.Lock()
defer m.mu.Unlock()
ch := make(chan time.Time)
t := &Ticker{
C: ch,
c: ch,
mock: m,
d: d,
next: m.now.Add(d),
}
m.timers = append(m.timers, (*internalTicker)(t))
return t
}
// Timer creates a new instance of Timer.
func (m *Mock) Timer(d time.Duration) *Timer {
defer m.inc(&m.calls.Timer)
m.mu.Lock()
defer m.mu.Unlock()
ch := make(chan time.Time)
t := &Timer{
C: ch,
c: ch,
mock: m,
next: m.now.Add(d),
}
m.timers = append(m.timers, (*internalTimer)(t))
return t
}
func (m *Mock) removeClockTimer(t clockTimer) {
m.mu.Lock()
defer m.mu.Unlock()
for i, timer := range m.timers {
if timer == t {
copy(m.timers[i:], m.timers[i+1:])
m.timers[len(m.timers)-1] = nil
m.timers = m.timers[:len(m.timers)-1]
break
}
}
sort.Sort(m.timers)
}
func (m *Mock) inc(addr *uint32) {
m.callsMutex.Lock()
defer m.callsMutex.Unlock()
*addr++
var newWaiting []waiting
for _, w := range m.waiting {
if m.calls.atLeast(w.expected) {
close(w.done)
continue
}
newWaiting = append(newWaiting, w)
}
m.waiting = newWaiting
}
// Wait waits for at least the relevant calls before returning. The expected
// Calls are always over the lifetime of the Mock. Values in the Calls struct
// are used as the minimum number of calls, this allows you to wait for only
// the calls you care about.
func (m *Mock) Wait(s Calls) {
m.callsMutex.Lock()
if m.calls.atLeast(s) {
m.callsMutex.Unlock()
return
}
done := make(chan struct{})
m.waiting = append(m.waiting, waiting{expected: s, done: done})
m.callsMutex.Unlock()
<-done
}
// clockTimer represents an object with an associated start time.
type clockTimer interface {
Next() time.Time
Tick(time.Time)
}
// clockTimers represents a list of sortable timers.
type clockTimers []clockTimer
func (a clockTimers) Len() int { return len(a) }
func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) }
// Timer represents a single event.
// The current time will be sent on C, unless the timer was created by AfterFunc.
type Timer struct {
C <-chan time.Time
c chan time.Time
timer *time.Timer // realtime impl, if set
next time.Time // next tick time
mock *Mock // mock clock, if set
fn func() // AfterFunc function, if set
}
// Stop turns off the ticker.
func (t *Timer) Stop() {
if t.timer != nil {
t.timer.Stop()
} else {
t.mock.removeClockTimer((*internalTimer)(t))
}
}
type internalTimer Timer
func (t *internalTimer) Next() time.Time { return t.next }
func (t *internalTimer) Tick(now time.Time) {
if t.fn != nil {
t.fn()
} else {
t.c <- now
}
t.mock.removeClockTimer((*internalTimer)(t))
gosched()
}
// Ticker holds a channel that receives "ticks" at regular intervals.
type Ticker struct {
C <-chan time.Time
c chan time.Time
ticker *time.Ticker // realtime impl, if set
next time.Time // next tick time
mock *Mock // mock clock, if set
d time.Duration // time between ticks
}
// Stop turns off the ticker.
func (t *Ticker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
} else {
t.mock.removeClockTimer((*internalTicker)(t))
}
}
type internalTicker Ticker
func (t *internalTicker) Next() time.Time { return t.next }
func (t *internalTicker) Tick(now time.Time) {
select {
case t.c <- now:
case <-time.After(1 * time.Millisecond):
}
t.next = now.Add(t.d)
gosched()
}
// Sleep momentarily so that other goroutines can process.
func gosched() { runtime.Gosched() }
// Calls keeps track of the count of calls for each of the methods on the Clock
// interface.
type Calls struct {
After uint32
AfterFunc uint32
Now uint32
Sleep uint32
Tick uint32
Ticker uint32
Timer uint32
}
// atLeast returns true if at least the number of calls in o have been made.
func (c Calls) atLeast(o Calls) bool {
if c.After < o.After {
return false
}
if c.AfterFunc < o.AfterFunc {
return false
}
if c.Now < o.Now {
return false
}
if c.Sleep < o.Sleep {
return false
}
if c.Tick < o.Tick {
return false
}
if c.Ticker < o.Ticker {
return false
}
if c.Timer < o.Timer {
return false
}
return true
}
type waiting struct {
expected Calls
done chan struct{}
}

View File

@ -0,0 +1,536 @@
package clock_test
import (
"fmt"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/facebookgo/clock"
)
// Ensure that the clock's After channel sends at the correct time.
func TestClock_After(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
go func() {
time.Sleep(30 * time.Millisecond)
t.Fatal("too late")
}()
gosched()
<-clock.New().After(20 * time.Millisecond)
if !ok {
t.Fatal("too early")
}
}
// Ensure that the clock's AfterFunc executes at the correct time.
func TestClock_AfterFunc(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
go func() {
time.Sleep(30 * time.Millisecond)
t.Fatal("too late")
}()
gosched()
var wg sync.WaitGroup
wg.Add(1)
clock.New().AfterFunc(20*time.Millisecond, func() {
wg.Done()
})
wg.Wait()
if !ok {
t.Fatal("too early")
}
}
// Ensure that the clock's time matches the standary library.
func TestClock_Now(t *testing.T) {
a := time.Now().Round(time.Second)
b := clock.New().Now().Round(time.Second)
if !a.Equal(b) {
t.Errorf("not equal: %s != %s", a, b)
}
}
// Ensure that the clock sleeps for the appropriate amount of time.
func TestClock_Sleep(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
go func() {
time.Sleep(30 * time.Millisecond)
t.Fatal("too late")
}()
gosched()
clock.New().Sleep(20 * time.Millisecond)
if !ok {
t.Fatal("too early")
}
}
// Ensure that the clock ticks correctly.
func TestClock_Tick(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
go func() {
time.Sleep(50 * time.Millisecond)
t.Fatal("too late")
}()
gosched()
c := clock.New().Tick(20 * time.Millisecond)
<-c
<-c
if !ok {
t.Fatal("too early")
}
}
// Ensure that the clock's ticker ticks correctly.
func TestClock_Ticker(t *testing.T) {
var ok bool
go func() {
time.Sleep(100 * time.Millisecond)
ok = true
}()
go func() {
time.Sleep(200 * time.Millisecond)
t.Fatal("too late")
}()
gosched()
ticker := clock.New().Ticker(50 * time.Millisecond)
<-ticker.C
<-ticker.C
if !ok {
t.Fatal("too early")
}
}
// Ensure that the clock's ticker can stop correctly.
func TestClock_Ticker_Stp(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
gosched()
ticker := clock.New().Ticker(20 * time.Millisecond)
<-ticker.C
ticker.Stop()
select {
case <-ticker.C:
t.Fatal("unexpected send")
case <-time.After(30 * time.Millisecond):
}
}
// Ensure that the clock's timer waits correctly.
func TestClock_Timer(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
go func() {
time.Sleep(30 * time.Millisecond)
t.Fatal("too late")
}()
gosched()
timer := clock.New().Timer(20 * time.Millisecond)
<-timer.C
if !ok {
t.Fatal("too early")
}
}
// Ensure that the clock's timer can be stopped.
func TestClock_Timer_Stop(t *testing.T) {
var ok bool
go func() {
time.Sleep(10 * time.Millisecond)
ok = true
}()
timer := clock.New().Timer(20 * time.Millisecond)
timer.Stop()
select {
case <-timer.C:
t.Fatal("unexpected send")
case <-time.After(30 * time.Millisecond):
}
}
// Ensure that the mock's After channel sends at the correct time.
func TestMock_After(t *testing.T) {
var ok int32
clock := clock.NewMock()
// Create a channel to execute after 10 mock seconds.
ch := clock.After(10 * time.Second)
go func(ch <-chan time.Time) {
<-ch
atomic.StoreInt32(&ok, 1)
}(ch)
// Move clock forward to just before the time.
clock.Add(9 * time.Second)
if atomic.LoadInt32(&ok) == 1 {
t.Fatal("too early")
}
// Move clock forward to the after channel's time.
clock.Add(1 * time.Second)
if atomic.LoadInt32(&ok) == 0 {
t.Fatal("too late")
}
}
// Ensure that the mock's AfterFunc executes at the correct time.
func TestMock_AfterFunc(t *testing.T) {
var ok int32
clock := clock.NewMock()
// Execute function after duration.
clock.AfterFunc(10*time.Second, func() {
atomic.StoreInt32(&ok, 1)
})
// Move clock forward to just before the time.
clock.Add(9 * time.Second)
if atomic.LoadInt32(&ok) == 1 {
t.Fatal("too early")
}
// Move clock forward to the after channel's time.
clock.Add(1 * time.Second)
if atomic.LoadInt32(&ok) == 0 {
t.Fatal("too late")
}
}
// Ensure that the mock's AfterFunc doesn't execute if stopped.
func TestMock_AfterFunc_Stop(t *testing.T) {
// Execute function after duration.
clock := clock.NewMock()
timer := clock.AfterFunc(10*time.Second, func() {
t.Fatal("unexpected function execution")
})
gosched()
// Stop timer & move clock forward.
timer.Stop()
clock.Add(10 * time.Second)
gosched()
}
// Ensure that the mock's current time can be changed.
func TestMock_Now(t *testing.T) {
clock := clock.NewMock()
if now := clock.Now(); !now.Equal(time.Unix(0, 0)) {
t.Fatalf("expected epoch, got: ", now)
}
// Add 10 seconds and check the time.
clock.Add(10 * time.Second)
if now := clock.Now(); !now.Equal(time.Unix(10, 0)) {
t.Fatalf("expected epoch, got: ", now)
}
}
// Ensure that the mock can sleep for the correct time.
func TestMock_Sleep(t *testing.T) {
var ok int32
clock := clock.NewMock()
// Create a channel to execute after 10 mock seconds.
go func() {
clock.Sleep(10 * time.Second)
atomic.StoreInt32(&ok, 1)
}()
gosched()
// Move clock forward to just before the sleep duration.
clock.Add(9 * time.Second)
if atomic.LoadInt32(&ok) == 1 {
t.Fatal("too early")
}
// Move clock forward to the after the sleep duration.
clock.Add(1 * time.Second)
if atomic.LoadInt32(&ok) == 0 {
t.Fatal("too late")
}
}
// Ensure that the mock's Tick channel sends at the correct time.
func TestMock_Tick(t *testing.T) {
var n int32
clock := clock.NewMock()
// Create a channel to increment every 10 seconds.
go func() {
tick := clock.Tick(10 * time.Second)
for {
<-tick
atomic.AddInt32(&n, 1)
}
}()
gosched()
// Move clock forward to just before the first tick.
clock.Add(9 * time.Second)
if atomic.LoadInt32(&n) != 0 {
t.Fatalf("expected 0, got %d", n)
}
// Move clock forward to the start of the first tick.
clock.Add(1 * time.Second)
if atomic.LoadInt32(&n) != 1 {
t.Fatalf("expected 1, got %d", n)
}
// Move clock forward over several ticks.
clock.Add(30 * time.Second)
if atomic.LoadInt32(&n) != 4 {
t.Fatalf("expected 4, got %d", n)
}
}
// Ensure that the mock's Ticker channel sends at the correct time.
func TestMock_Ticker(t *testing.T) {
var n int32
clock := clock.NewMock()
// Create a channel to increment every microsecond.
go func() {
ticker := clock.Ticker(1 * time.Microsecond)
for {
<-ticker.C
atomic.AddInt32(&n, 1)
}
}()
gosched()
// Move clock forward.
clock.Add(10 * time.Microsecond)
if atomic.LoadInt32(&n) != 10 {
t.Fatalf("unexpected: %d", n)
}
}
// Ensure that the mock's Ticker channel won't block if not read from.
func TestMock_Ticker_Overflow(t *testing.T) {
clock := clock.NewMock()
ticker := clock.Ticker(1 * time.Microsecond)
clock.Add(10 * time.Microsecond)
ticker.Stop()
}
// Ensure that the mock's Ticker can be stopped.
func TestMock_Ticker_Stop(t *testing.T) {
var n int32
clock := clock.NewMock()
// Create a channel to increment every second.
ticker := clock.Ticker(1 * time.Second)
go func() {
for {
<-ticker.C
atomic.AddInt32(&n, 1)
}
}()
gosched()
// Move clock forward.
clock.Add(5 * time.Second)
if atomic.LoadInt32(&n) != 5 {
t.Fatalf("expected 5, got: %d", n)
}
ticker.Stop()
// Move clock forward again.
clock.Add(5 * time.Second)
if atomic.LoadInt32(&n) != 5 {
t.Fatalf("still expected 5, got: %d", n)
}
}
// Ensure that multiple tickers can be used together.
func TestMock_Ticker_Multi(t *testing.T) {
var n int32
clock := clock.NewMock()
go func() {
a := clock.Ticker(1 * time.Microsecond)
b := clock.Ticker(3 * time.Microsecond)
for {
select {
case <-a.C:
atomic.AddInt32(&n, 1)
case <-b.C:
atomic.AddInt32(&n, 100)
}
}
}()
gosched()
// Move clock forward.
clock.Add(10 * time.Microsecond)
gosched()
if atomic.LoadInt32(&n) != 310 {
t.Fatalf("unexpected: %d", n)
}
}
func ExampleMock_After() {
// Create a new mock clock.
clock := clock.NewMock()
count := 0
// Create a channel to execute after 10 mock seconds.
go func() {
<-clock.After(10 * time.Second)
count = 100
}()
runtime.Gosched()
// Print the starting value.
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Move the clock forward 5 seconds and print the value again.
clock.Add(5 * time.Second)
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Move the clock forward 5 seconds to the tick time and check the value.
clock.Add(5 * time.Second)
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Output:
// 1970-01-01 00:00:00 +0000 UTC: 0
// 1970-01-01 00:00:05 +0000 UTC: 0
// 1970-01-01 00:00:10 +0000 UTC: 100
}
func ExampleMock_AfterFunc() {
// Create a new mock clock.
clock := clock.NewMock()
count := 0
// Execute a function after 10 mock seconds.
clock.AfterFunc(10*time.Second, func() {
count = 100
})
runtime.Gosched()
// Print the starting value.
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Move the clock forward 10 seconds and print the new value.
clock.Add(10 * time.Second)
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Output:
// 1970-01-01 00:00:00 +0000 UTC: 0
// 1970-01-01 00:00:10 +0000 UTC: 100
}
func ExampleMock_Sleep() {
// Create a new mock clock.
clock := clock.NewMock()
count := 0
// Execute a function after 10 mock seconds.
go func() {
clock.Sleep(10 * time.Second)
count = 100
}()
runtime.Gosched()
// Print the starting value.
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Move the clock forward 10 seconds and print the new value.
clock.Add(10 * time.Second)
fmt.Printf("%s: %d\n", clock.Now().UTC(), count)
// Output:
// 1970-01-01 00:00:00 +0000 UTC: 0
// 1970-01-01 00:00:10 +0000 UTC: 100
}
func ExampleMock_Ticker() {
// Create a new mock clock.
clock := clock.NewMock()
count := 0
// Increment count every mock second.
go func() {
ticker := clock.Ticker(1 * time.Second)
for {
<-ticker.C
count++
}
}()
runtime.Gosched()
// Move the clock forward 10 seconds and print the new value.
clock.Add(10 * time.Second)
fmt.Printf("Count is %d after 10 seconds\n", count)
// Move the clock forward 5 more seconds and print the new value.
clock.Add(5 * time.Second)
fmt.Printf("Count is %d after 15 seconds\n", count)
// Output:
// Count is 10 after 10 seconds
// Count is 15 after 15 seconds
}
func ExampleMock_Timer() {
// Create a new mock clock.
clock := clock.NewMock()
count := 0
// Increment count after a mock second.
go func() {
timer := clock.Timer(1 * time.Second)
<-timer.C
count++
}()
runtime.Gosched()
// Move the clock forward 10 seconds and print the new value.
clock.Add(10 * time.Second)
fmt.Printf("Count is %d after 10 seconds\n", count)
// Output:
// Count is 1 after 10 seconds
}
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
func gosched() { time.Sleep(1 * time.Millisecond) }

View File

@ -0,0 +1,23 @@
language: go
go:
- 1.3
matrix:
fast_finish: true
before_install:
- go get -v code.google.com/p/go.tools/cmd/vet
- go get -v github.com/golang/lint/golint
- go get -v code.google.com/p/go.tools/cmd/cover
install:
- go install -race -v std
- go get -race -t -v ./...
- go install -race -v ./...
script:
- go vet ./...
- $HOME/gopath/bin/golint .
- go test -cpu=2 -race -v ./...
- go test -cpu=2 -covermode=atomic ./...

View File

@ -0,0 +1,380 @@
// Package httpdown provides http.ConnState enabled graceful termination of
// http.Server.
package httpdown
import (
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/facebookgo/clock"
"github.com/facebookgo/stats"
)
const (
defaultStopTimeout = time.Minute
defaultKillTimeout = time.Minute
)
// A Server allows encapsulates the process of accepting new connections and
// serving them, and gracefully shutting down the listener without dropping
// active connections.
type Server interface {
// Wait waits for the serving loop to finish. This will happen when Stop is
// called, at which point it returns no error, or if there is an error in the
// serving loop. You must call Wait after calling Serve or ListenAndServe.
Wait() error
// Stop stops the listener. It will block until all connections have been
// closed.
Stop() error
}
// HTTP defines the configuration for serving a http.Server. Multiple calls to
// Serve or ListenAndServe can be made on the same HTTP instance. The default
// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop()
// returns.
type HTTP struct {
// StopTimeout is the duration before we begin force closing connections.
// Defaults to 1 minute.
StopTimeout time.Duration
// KillTimeout is the duration before which we completely give up and abort
// even though we still have connected clients. This is useful when a large
// number of client connections exist and closing them can take a long time.
// Note, this is in addition to the StopTimeout. Defaults to 1 minute.
KillTimeout time.Duration
// Stats is optional. If provided, it will be used to record various metrics.
Stats stats.Client
// Clock allows for testing timing related functionality. Do not specify this
// in production code.
Clock clock.Clock
}
// Serve provides the low-level API which is useful if you're creating your own
// net.Listener.
func (h HTTP) Serve(s *http.Server, l net.Listener) Server {
stopTimeout := h.StopTimeout
if stopTimeout == 0 {
stopTimeout = defaultStopTimeout
}
killTimeout := h.KillTimeout
if killTimeout == 0 {
killTimeout = defaultKillTimeout
}
klock := h.Clock
if klock == nil {
klock = clock.New()
}
ss := &server{
stopTimeout: stopTimeout,
killTimeout: killTimeout,
stats: h.Stats,
clock: klock,
oldConnState: s.ConnState,
listener: l,
server: s,
serveDone: make(chan struct{}),
serveErr: make(chan error, 1),
new: make(chan net.Conn),
active: make(chan net.Conn),
idle: make(chan net.Conn),
closed: make(chan net.Conn),
stop: make(chan chan struct{}),
kill: make(chan chan struct{}),
}
s.ConnState = ss.connState
go ss.manage()
go ss.serve()
return ss
}
// ListenAndServe returns a Server for the given http.Server. It is equivalent
// to ListendAndServe from the standard library, but returns immediately.
// Requests will be accepted in a background goroutine. If the http.Server has
// a non-nil TLSConfig, a TLS enabled listener will be setup.
func (h HTTP) ListenAndServe(s *http.Server) (Server, error) {
addr := s.Addr
if addr == "" {
if s.TLSConfig == nil {
addr = ":http"
} else {
addr = ":https"
}
}
l, err := net.Listen("tcp", addr)
if err != nil {
stats.BumpSum(h.Stats, "listen.error", 1)
return nil, err
}
if s.TLSConfig != nil {
l = tls.NewListener(l, s.TLSConfig)
}
return h.Serve(s, l), nil
}
// server manages the serving process and allows for gracefully stopping it.
type server struct {
stopTimeout time.Duration
killTimeout time.Duration
stats stats.Client
clock clock.Clock
oldConnState func(net.Conn, http.ConnState)
server *http.Server
serveDone chan struct{}
serveErr chan error
listener net.Listener
new chan net.Conn
active chan net.Conn
idle chan net.Conn
closed chan net.Conn
stop chan chan struct{}
kill chan chan struct{}
stopOnce sync.Once
stopErr error
}
func (s *server) connState(c net.Conn, cs http.ConnState) {
if s.oldConnState != nil {
s.oldConnState(c, cs)
}
switch cs {
case http.StateNew:
s.new <- c
case http.StateActive:
s.active <- c
case http.StateIdle:
s.idle <- c
case http.StateHijacked, http.StateClosed:
s.closed <- c
}
}
func (s *server) manage() {
defer func() {
close(s.new)
close(s.active)
close(s.idle)
close(s.closed)
close(s.stop)
close(s.kill)
}()
var stopDone chan struct{}
conns := map[net.Conn]http.ConnState{}
var countNew, countActive, countIdle float64
// decConn decrements the count associated with the current state of the
// given connection.
decConn := func(c net.Conn) {
switch conns[c] {
default:
panic(fmt.Errorf("unknown existing connection: %s", c))
case http.StateNew:
countNew--
case http.StateActive:
countActive--
case http.StateIdle:
countIdle--
}
}
// setup a ticker to report various values every minute. if we don't have a
// Stats implementation provided, we Stop it so it never ticks.
statsTicker := s.clock.Ticker(time.Minute)
if s.stats == nil {
statsTicker.Stop()
}
for {
select {
case <-statsTicker.C:
// we'll only get here when s.stats is not nil
s.stats.BumpAvg("http-state.new", countNew)
s.stats.BumpAvg("http-state.active", countActive)
s.stats.BumpAvg("http-state.idle", countIdle)
s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle)
case c := <-s.new:
conns[c] = http.StateNew
countNew++
case c := <-s.active:
decConn(c)
countActive++
conns[c] = http.StateActive
case c := <-s.idle:
decConn(c)
countIdle++
conns[c] = http.StateIdle
// if we're already stopping, close it
if stopDone != nil {
c.Close()
}
case c := <-s.closed:
stats.BumpSum(s.stats, "conn.closed", 1)
decConn(c)
delete(conns, c)
// if we're waiting to stop and are all empty, we just closed the last
// connection and we're done.
if stopDone != nil && len(conns) == 0 {
close(stopDone)
return
}
case stopDone = <-s.stop:
// if we're already all empty, we're already done
if len(conns) == 0 {
close(stopDone)
return
}
// close current idle connections right away
for c, cs := range conns {
if cs == http.StateIdle {
c.Close()
}
}
// continue the loop and wait for all the ConnState updates which will
// eventually close(stopDone) and return from this goroutine.
case killDone := <-s.kill:
// force close all connections
stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns)))
for c := range conns {
c.Close()
}
// don't block the kill.
close(killDone)
// continue the loop and we wait for all the ConnState updates and will
// return from this goroutine when we're all done. otherwise we'll try to
// send those ConnState updates on closed channels.
}
}
}
func (s *server) serve() {
stats.BumpSum(s.stats, "serve", 1)
s.serveErr <- s.server.Serve(s.listener)
close(s.serveDone)
close(s.serveErr)
}
func (s *server) Wait() error {
if err := <-s.serveErr; !isUseOfClosedError(err) {
return err
}
return nil
}
func (s *server) Stop() error {
s.stopOnce.Do(func() {
defer stats.BumpTime(s.stats, "stop.time").End()
stats.BumpSum(s.stats, "stop", 1)
// first disable keep-alive for new connections
s.server.SetKeepAlivesEnabled(false)
// then close the listener so new connections can't connect come thru
closeErr := s.listener.Close()
<-s.serveDone
// then trigger the background goroutine to stop and wait for it
stopDone := make(chan struct{})
s.stop <- stopDone
// wait for stop
select {
case <-stopDone:
case <-s.clock.After(s.stopTimeout):
defer stats.BumpTime(s.stats, "kill.time").End()
stats.BumpSum(s.stats, "kill", 1)
// stop timed out, wait for kill
killDone := make(chan struct{})
s.kill <- killDone
select {
case <-killDone:
case <-s.clock.After(s.killTimeout):
// kill timed out, give up
stats.BumpSum(s.stats, "kill.timeout", 1)
}
}
if closeErr != nil && !isUseOfClosedError(closeErr) {
stats.BumpSum(s.stats, "listener.close.error", 1)
s.stopErr = closeErr
}
})
return s.stopErr
}
func isUseOfClosedError(err error) bool {
if err == nil {
return false
}
if opErr, ok := err.(*net.OpError); ok {
err = opErr.Err
}
return err.Error() == "use of closed network connection"
}
// ListenAndServe is a convenience function to serve and wait for a SIGTERM
// or SIGINT before shutting down.
func ListenAndServe(s *http.Server, hd *HTTP) error {
if hd == nil {
hd = &HTTP{}
}
hs, err := hd.ListenAndServe(s)
if err != nil {
return err
}
log.Printf("serving on http://%s/ with pid %d\n", s.Addr, os.Getpid())
waiterr := make(chan error, 1)
go func() {
defer close(waiterr)
waiterr <- hs.Wait()
}()
signals := make(chan os.Signal, 10)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
select {
case err := <-waiterr:
if err != nil {
return err
}
case s := <-signals:
signal.Stop(signals)
log.Printf("signal received: %s\n", s)
if err := hs.Stop(); err != nil {
return err
}
if err := <-waiterr; err != nil {
return err
}
}
log.Println("exiting")
return nil
}

View File

@ -0,0 +1,43 @@
package main
import (
"flag"
"fmt"
"net/http"
"os"
"time"
"github.com/facebookgo/httpdown"
)
func handler(w http.ResponseWriter, r *http.Request) {
duration, err := time.ParseDuration(r.FormValue("duration"))
if err != nil {
http.Error(w, err.Error(), 400)
return
}
fmt.Fprintf(w, "going to sleep %s with pid %d\n", duration, os.Getpid())
w.(http.Flusher).Flush()
time.Sleep(duration)
fmt.Fprintf(w, "slept %s with pid %d\n", duration, os.Getpid())
}
func main() {
server := &http.Server{
Addr: "127.0.0.1:8080",
Handler: http.HandlerFunc(handler),
}
hd := &httpdown.HTTP{
StopTimeout: 10 * time.Second,
KillTimeout: 1 * time.Second,
}
flag.StringVar(&server.Addr, "addr", server.Addr, "http address")
flag.DurationVar(&hd.StopTimeout, "stop-timeout", hd.StopTimeout, "stop timeout")
flag.DurationVar(&hd.KillTimeout, "kill-timeout", hd.KillTimeout, "kill timeout")
flag.Parse()
if err := httpdown.ListenAndServe(server, hd); err != nil {
panic(err)
}
}

View File

@ -0,0 +1,677 @@
package httpdown_test
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"regexp"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/facebookgo/clock"
"github.com/facebookgo/ensure"
"github.com/facebookgo/freeport"
"github.com/facebookgo/httpdown"
"github.com/facebookgo/stats"
)
type onCloseListener struct {
net.Listener
mutex sync.Mutex
onClose chan struct{}
}
func (o *onCloseListener) Close() error {
// Listener is closed twice, once by Grace, and once by the http library, so
// we guard against a double close of the chan.
defer func() {
o.mutex.Lock()
defer o.mutex.Unlock()
if o.onClose != nil {
close(o.onClose)
o.onClose = nil
}
}()
return o.Listener.Close()
}
func NewOnCloseListener(l net.Listener) (net.Listener, chan struct{}) {
c := make(chan struct{})
return &onCloseListener{Listener: l, onClose: c}, c
}
type closeErrListener struct {
net.Listener
err error
}
func (c *closeErrListener) Close() error {
c.Listener.Close()
return c.err
}
type acceptErrListener struct {
net.Listener
err chan error
}
func (c *acceptErrListener) Accept() (net.Conn, error) {
return nil, <-c.err
}
type closeErrConn struct {
net.Conn
unblockClose chan chan struct{}
}
func (c *closeErrConn) Close() error {
ch := <-c.unblockClose
// Close gets called multiple times, but only the first one gets this ch
if ch != nil {
defer close(ch)
}
return c.Conn.Close()
}
type closeErrConnListener struct {
net.Listener
unblockClose chan chan struct{}
}
func (l *closeErrConnListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return c, err
}
return &closeErrConn{Conn: c, unblockClose: l.unblockClose}, nil
}
func TestHTTPStopWithNoRequest(t *testing.T) {
t.Parallel()
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
statsDone := make(chan struct{}, 2)
hc := &stats.HookClient{
BumpSumHook: func(key string, val float64) {
if key == "serve" && val == 1 {
statsDone <- struct{}{}
}
if key == "stop" && val == 1 {
statsDone <- struct{}{}
}
},
}
server := &http.Server{}
down := &httpdown.HTTP{Stats: hc}
s := down.Serve(server, listener)
ensure.Nil(t, s.Stop())
<-statsDone
<-statsDone
}
func TestHTTPStopWithFinishedRequest(t *testing.T) {
t.Parallel()
hello := []byte("hello")
fin := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(fin)
w.Write(hello)
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
actualBody, err := ioutil.ReadAll(res.Body)
ensure.Nil(t, err)
ensure.DeepEqual(t, actualBody, hello)
ensure.Nil(t, res.Body.Close())
// At this point the request is finished, and the connection should be alive
// but idle (because we have keep alive enabled by default in our Transport).
ensure.Nil(t, s.Stop())
<-fin
ensure.Nil(t, s.Wait())
}
func TestHTTPStopWithActiveRequest(t *testing.T) {
t.Parallel()
const count = 10000
hello := []byte("hello")
finOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.WriteHeader(200)
for i := 0; i < count; i++ {
w.Write(hello)
}
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
actualBody, err := ioutil.ReadAll(res.Body)
ensure.Nil(t, err)
ensure.DeepEqual(t, actualBody, bytes.Repeat(hello, count))
ensure.Nil(t, res.Body.Close())
<-finOkHandler
<-finStop
}
func TestNewRequestAfterStop(t *testing.T) {
t.Parallel()
const count = 10000
hello := []byte("hello")
finOkHandler := make(chan struct{})
unblockOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.WriteHeader(200)
const diff = 500
for i := 0; i < count-diff; i++ {
w.Write(hello)
}
<-unblockOkHandler
for i := 0; i < diff; i++ {
w.Write(hello)
}
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
listener, onClose := NewOnCloseListener(listener)
ensure.Nil(t, err)
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
// Wait until the listener is closed.
<-onClose
// Now the next request should not be able to connect as the listener is
// now closed.
_, err = client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
// We should just get "connection refused" here, but sometimes, very rarely,
// we get a "connection reset" instead. Unclear why this happens.
ensure.Err(t, err, regexp.MustCompile("(connection refused|connection reset by peer)$"))
// Unblock the handler and ensure we finish writing the rest of the body
// successfully.
close(unblockOkHandler)
actualBody, err := ioutil.ReadAll(res.Body)
ensure.Nil(t, err)
ensure.DeepEqual(t, actualBody, bytes.Repeat(hello, count))
ensure.Nil(t, res.Body.Close())
<-finOkHandler
<-finStop
}
func TestHTTPListenerCloseError(t *testing.T) {
t.Parallel()
expectedError := errors.New("foo")
listener, err := net.Listen("tcp", "127.0.0.1:0")
listener = &closeErrListener{Listener: listener, err: expectedError}
ensure.Nil(t, err)
server := &http.Server{}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
ensure.DeepEqual(t, s.Stop(), expectedError)
}
func TestHTTPServeError(t *testing.T) {
t.Parallel()
expectedError := errors.New("foo")
listener, err := net.Listen("tcp", "127.0.0.1:0")
errChan := make(chan error)
listener = &acceptErrListener{Listener: listener, err: errChan}
ensure.Nil(t, err)
server := &http.Server{}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
errChan <- expectedError
ensure.DeepEqual(t, s.Wait(), expectedError)
ensure.Nil(t, s.Stop())
}
func TestHTTPWithinStopTimeout(t *testing.T) {
t.Parallel()
hello := []byte("hello")
finOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.WriteHeader(200)
w.Write(hello)
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{StopTimeout: time.Minute}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
actualBody, err := ioutil.ReadAll(res.Body)
ensure.Nil(t, err)
ensure.DeepEqual(t, actualBody, hello)
ensure.Nil(t, res.Body.Close())
<-finOkHandler
<-finStop
}
func TestHTTPStopTimeoutMissed(t *testing.T) {
t.Parallel()
klock := clock.NewMock()
const count = 10000
hello := []byte("hello")
finOkHandler := make(chan struct{})
unblockOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.Header().Set("Content-Length", fmt.Sprint(len(hello)*count))
w.WriteHeader(200)
for i := 0; i < count/2; i++ {
w.Write(hello)
}
<-unblockOkHandler
for i := 0; i < count/2; i++ {
w.Write(hello)
}
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{
StopTimeout: time.Minute,
Clock: klock,
}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
klock.Wait(clock.Calls{After: 1}) // wait for Stop to call After
klock.Add(down.StopTimeout)
_, err = ioutil.ReadAll(res.Body)
ensure.Err(t, err, regexp.MustCompile("^unexpected EOF$"))
ensure.Nil(t, res.Body.Close())
close(unblockOkHandler)
<-finOkHandler
<-finStop
}
func TestHTTPKillTimeout(t *testing.T) {
t.Parallel()
klock := clock.NewMock()
statsDone := make(chan struct{}, 1)
hc := &stats.HookClient{
BumpSumHook: func(key string, val float64) {
if key == "kill" && val == 1 {
statsDone <- struct{}{}
}
},
}
const count = 10000
hello := []byte("hello")
finOkHandler := make(chan struct{})
unblockOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.Header().Set("Content-Length", fmt.Sprint(len(hello)*count))
w.WriteHeader(200)
for i := 0; i < count/2; i++ {
w.Write(hello)
}
<-unblockOkHandler
for i := 0; i < count/2; i++ {
w.Write(hello)
}
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{
StopTimeout: time.Minute,
KillTimeout: time.Minute,
Stats: hc,
Clock: klock,
}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
klock.Wait(clock.Calls{After: 1}) // wait for Stop to call After
klock.Add(down.StopTimeout)
_, err = ioutil.ReadAll(res.Body)
ensure.Err(t, err, regexp.MustCompile("^unexpected EOF$"))
ensure.Nil(t, res.Body.Close())
close(unblockOkHandler)
<-finOkHandler
<-finStop
<-statsDone
}
func TestHTTPKillTimeoutMissed(t *testing.T) {
t.Parallel()
klock := clock.NewMock()
statsDone := make(chan struct{}, 1)
hc := &stats.HookClient{
BumpSumHook: func(key string, val float64) {
if key == "kill.timeout" && val == 1 {
statsDone <- struct{}{}
}
},
}
const count = 10000
hello := []byte("hello")
finOkHandler := make(chan struct{})
unblockOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.Header().Set("Content-Length", fmt.Sprint(len(hello)*count))
w.WriteHeader(200)
for i := 0; i < count/2; i++ {
w.Write(hello)
}
<-unblockOkHandler
for i := 0; i < count/2; i++ {
w.Write(hello)
}
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
unblockConnClose := make(chan chan struct{}, 1)
listener = &closeErrConnListener{
Listener: listener,
unblockClose: unblockConnClose,
}
server := &http.Server{Handler: http.HandlerFunc(okHandler)}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{
StopTimeout: time.Minute,
KillTimeout: time.Minute,
Stats: hc,
Clock: klock,
}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
// Start the Stop process.
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
klock.Wait(clock.Calls{After: 1}) // wait for Stop to call After
klock.Add(down.StopTimeout) // trigger stop timeout
klock.Wait(clock.Calls{After: 2}) // wait for Kill to call After
klock.Add(down.KillTimeout) // trigger kill timeout
// We hit both the StopTimeout & the KillTimeout.
<-finStop
// Then we unblock the Close, so we get an unexpected EOF since we close
// before we finish writing the response.
connCloseDone := make(chan struct{})
unblockConnClose <- connCloseDone
<-connCloseDone
close(unblockConnClose)
// Then we unblock the handler which tries to write the rest of the data.
close(unblockOkHandler)
_, err = ioutil.ReadAll(res.Body)
ensure.Err(t, err, regexp.MustCompile("^unexpected EOF$"))
ensure.Nil(t, res.Body.Close())
<-finOkHandler
<-statsDone
}
func TestDoubleStop(t *testing.T) {
t.Parallel()
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
ensure.Nil(t, s.Stop())
ensure.Nil(t, s.Stop())
}
func TestExistingConnState(t *testing.T) {
t.Parallel()
hello := []byte("hello")
fin := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(fin)
w.Write(hello)
}
var called int32
listener, err := net.Listen("tcp", "127.0.0.1:0")
ensure.Nil(t, err)
server := &http.Server{
Handler: http.HandlerFunc(okHandler),
ConnState: func(c net.Conn, s http.ConnState) {
atomic.AddInt32(&called, 1)
},
}
transport := &http.Transport{}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{}
s := down.Serve(server, listener)
res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String()))
ensure.Nil(t, err)
actualBody, err := ioutil.ReadAll(res.Body)
ensure.Nil(t, err)
ensure.DeepEqual(t, actualBody, hello)
ensure.Nil(t, res.Body.Close())
ensure.Nil(t, s.Stop())
<-fin
ensure.True(t, atomic.LoadInt32(&called) > 0)
}
func TestHTTPDefaultListenError(t *testing.T) {
if os.Getuid() == 0 {
t.Skip("cant run this test as root")
}
statsDone := make(chan struct{}, 1)
hc := &stats.HookClient{
BumpSumHook: func(key string, val float64) {
if key == "listen.error" && val == 1 {
statsDone <- struct{}{}
}
},
}
t.Parallel()
down := &httpdown.HTTP{Stats: hc}
_, err := down.ListenAndServe(&http.Server{})
ensure.Err(t, err, regexp.MustCompile("listen tcp :80: bind: permission denied"))
<-statsDone
}
func TestHTTPSDefaultListenError(t *testing.T) {
if os.Getuid() == 0 {
t.Skip("cant run this test as root")
}
t.Parallel()
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Fatalf("error loading cert: %v", err)
}
down := &httpdown.HTTP{}
_, err = down.ListenAndServe(&http.Server{
TLSConfig: &tls.Config{
NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{cert},
},
})
ensure.Err(t, err, regexp.MustCompile("listen tcp :443: bind: permission denied"))
}
func TestTLS(t *testing.T) {
t.Parallel()
port, err := freeport.Get()
ensure.Nil(t, err)
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Fatalf("error loading cert: %v", err)
}
const count = 10000
hello := []byte("hello")
finOkHandler := make(chan struct{})
okHandler := func(w http.ResponseWriter, r *http.Request) {
defer close(finOkHandler)
w.WriteHeader(200)
for i := 0; i < count; i++ {
w.Write(hello)
}
}
server := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: http.HandlerFunc(okHandler),
TLSConfig: &tls.Config{
NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{cert},
},
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
client := &http.Client{Transport: transport}
down := &httpdown.HTTP{}
s, err := down.ListenAndServe(server)
ensure.Nil(t, err)
res, err := client.Get(fmt.Sprintf("https://%s/", server.Addr))
ensure.Nil(t, err)
finStop := make(chan struct{})
go func() {
defer close(finStop)
ensure.Nil(t, s.Stop())
}()
actualBody, err := ioutil.ReadAll(res.Body)
ensure.Nil(t, err)
ensure.DeepEqual(t, actualBody, bytes.Repeat(hello, count))
ensure.Nil(t, res.Body.Close())
<-finOkHandler
<-finStop
}
// localhostCert is a PEM-encoded TLS cert with SAN IPs
// "127.0.0.1" and "[::1]", expiring at the last second of 2049 (the end
// of ASN.1 time).
// generated from src/pkg/crypto/tls:
// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
MIIBdzCCASOgAwIBAgIBADALBgkqhkiG9w0BAQUwEjEQMA4GA1UEChMHQWNtZSBD
bzAeFw03MDAxMDEwMDAwMDBaFw00OTEyMzEyMzU5NTlaMBIxEDAOBgNVBAoTB0Fj
bWUgQ28wWjALBgkqhkiG9w0BAQEDSwAwSAJBALyCfqwwip8BvTKgVKGdmjZTU8DD
ndR+WALmFPIRqn89bOU3s30olKiqYEju/SFoEvMyFRT/TWEhXHDaufThqaMCAwEA
AaNoMGYwDgYDVR0PAQH/BAQDAgCkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1Ud
EwEB/wQFMAMBAf8wLgYDVR0RBCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAA
AAAAAAAAAAAAAAEwCwYJKoZIhvcNAQEFA0EAr/09uy108p51rheIOSnz4zgduyTl
M+4AmRo8/U1twEZLgfAGG/GZjREv2y4mCEUIM3HebCAqlA5jpRg76Rf8jw==
-----END CERTIFICATE-----`)
// localhostKey is the private key for localhostCert.
var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBOQIBAAJBALyCfqwwip8BvTKgVKGdmjZTU8DDndR+WALmFPIRqn89bOU3s30o
lKiqYEju/SFoEvMyFRT/TWEhXHDaufThqaMCAwEAAQJAPXuWUxTV8XyAt8VhNQER
LgzJcUKb9JVsoS1nwXgPksXnPDKnL9ax8VERrdNr+nZbj2Q9cDSXBUovfdtehcdP
qQIhAO48ZsPylbTrmtjDEKiHT2Ik04rLotZYS2U873J6I7WlAiEAypDjYxXyafv/
Yo1pm9onwcetQKMW8CS3AjuV9Axzj6cCIEx2Il19fEMG4zny0WPlmbrcKvD/DpJQ
4FHrzsYlIVTpAiAas7S1uAvneqd0l02HlN9OxQKKlbUNXNme+rnOnOGS2wIgS0jW
zl1jvrOSJeP1PpAHohWz6LOhEr8uvltWkN6x3vE=
-----END RSA PRIVATE KEY-----`)

View File

@ -0,0 +1,30 @@
BSD License
For httpdown software
Copyright (c) 2015, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,33 @@
Additional Grant of Patent Rights Version 2
"Software" means the httpdown software distributed by Facebook, Inc.
Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software
("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable
(subject to the termination provision below) license under any Necessary
Claims, to make, have made, use, sell, offer to sell, import, and otherwise
transfer the Software. For avoidance of doubt, no license is granted under
Facebooks rights in any patent claims that are infringed by (i) modifications
to the Software made by you or any third party or (ii) the Software in
combination with any software or other technology.
The license granted hereunder will terminate, automatically and without notice,
if you (or any of your subsidiaries, corporate affiliates or agents) initiate
directly or indirectly, or take a direct financial interest in, any Patent
Assertion: (i) against Facebook or any of its subsidiaries or corporate
affiliates, (ii) against any party if such Patent Assertion arises in whole or
in part from any software, technology, product or service of Facebook or any of
its subsidiaries or corporate affiliates, or (iii) against any party relating
to the Software. Notwithstanding the foregoing, if Facebook or any of its
subsidiaries or corporate affiliates files a lawsuit alleging patent
infringement against you in the first instance, and you respond by filing a
patent infringement counterclaim in that lawsuit against that party that is
unrelated to the Software, the license granted hereunder will not terminate
under section (i) of this paragraph due to such counterclaim.
A "Necessary Claim" is a claim of a patent owned by Facebook that is
necessarily infringed by the Software standing alone.
A "Patent Assertion" is any lawsuit or other action alleging direct, indirect,
or contributory infringement or inducement to infringe any patent, including a
cross-claim or counterclaim.

View File

@ -0,0 +1,41 @@
httpdown [![Build Status](https://secure.travis-ci.org/facebookgo/httpdown.png)](https://travis-ci.org/facebookgo/httpdown)
========
Documentation: https://godoc.org/github.com/facebookgo/httpdown
Package httpdown provides a library that makes it easy to build a HTTP server
that can be shutdown gracefully (that is, without dropping any connections).
If you want graceful restart and not just graceful shutdown, look at the
[grace](https://github.com/facebookgo/grace) package which uses this package
underneath but also provides graceful restart.
Usage
-----
Demo HTTP Server with graceful termination:
https://github.com/facebookgo/httpdown/blob/master/httpdown_example/main.go
1. Install the demo application
go get github.com/facebookgo/httpdown/httpdown_example
1. Start it in the first terminal
httpdown_example
This will output something like:
2014/11/18 21:57:50 serving on http://127.0.0.1:8080/ with pid 17
1. In a second terminal start a slow HTTP request
curl 'http://localhost:8080/?duration=20s'
1. In a third terminal trigger a graceful shutdown (using the pid from your output):
kill -TERM 17
This will demonstrate that the slow request was served before the server was
shutdown. You could also have used `Ctrl-C` instead of `kill` as the example
application triggers graceful shutdown on TERM or INT signals.

View File

@ -0,0 +1,24 @@
language: go
go:
- 1.2
- 1.3
matrix:
fast_finish: true
before_install:
- go get -v code.google.com/p/go.tools/cmd/vet
- go get -v github.com/golang/lint/golint
- go get -v code.google.com/p/go.tools/cmd/cover
install:
- go install -race -v std
- go get -race -t -v ./...
- go install -race -v ./...
script:
- go vet ./...
- $HOME/gopath/bin/golint .
- go test -cpu=2 -race -v ./...
- go test -cpu=2 -covermode=atomic ./...

View File

@ -0,0 +1,30 @@
BSD License
For stats software
Copyright (c) 2015, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,33 @@
Additional Grant of Patent Rights Version 2
"Software" means the stats software distributed by Facebook, Inc.
Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software
("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable
(subject to the termination provision below) license under any Necessary
Claims, to make, have made, use, sell, offer to sell, import, and otherwise
transfer the Software. For avoidance of doubt, no license is granted under
Facebooks rights in any patent claims that are infringed by (i) modifications
to the Software made by you or any third party or (ii) the Software in
combination with any software or other technology.
The license granted hereunder will terminate, automatically and without notice,
if you (or any of your subsidiaries, corporate affiliates or agents) initiate
directly or indirectly, or take a direct financial interest in, any Patent
Assertion: (i) against Facebook or any of its subsidiaries or corporate
affiliates, (ii) against any party if such Patent Assertion arises in whole or
in part from any software, technology, product or service of Facebook or any of
its subsidiaries or corporate affiliates, or (iii) against any party relating
to the Software. Notwithstanding the foregoing, if Facebook or any of its
subsidiaries or corporate affiliates files a lawsuit alleging patent
infringement against you in the first instance, and you respond by filing a
patent infringement counterclaim in that lawsuit against that party that is
unrelated to the Software, the license granted hereunder will not terminate
under section (i) of this paragraph due to such counterclaim.
A "Necessary Claim" is a claim of a patent owned by Facebook that is
necessarily infringed by the Software standing alone.
A "Patent Assertion" is any lawsuit or other action alleging direct, indirect,
or contributory infringement or inducement to infringe any patent, including a
cross-claim or counterclaim.

View File

@ -0,0 +1,4 @@
stats [![Build Status](https://secure.travis-ci.org/facebookgo/stats.png)](https://travis-ci.org/facebookgo/stats)
=====
Documentation: https://godoc.org/github.com/facebookgo/stats

View File

@ -0,0 +1,166 @@
// Package stats defines a lightweight interface for collecting statistics. It
// doesn't provide an implementation, just the shared interface.
package stats
// Client provides methods to collection statistics.
type Client interface {
// BumpAvg bumps the average for the given key.
BumpAvg(key string, val float64)
// BumpSum bumps the sum for the given key.
BumpSum(key string, val float64)
// BumpHistogram bumps the histogram for the given key.
BumpHistogram(key string, val float64)
// BumpTime is a special version of BumpHistogram which is specialized for
// timers. Calling it starts the timer, and it returns a value on which End()
// can be called to indicate finishing the timer. A convenient way of
// recording the duration of a function is calling it like such at the top of
// the function:
//
// defer s.BumpTime("my.function").End()
BumpTime(key string) interface {
End()
}
}
// PrefixClient adds multiple keys for the same value, with each prefix
// added to the key and calls the underlying client.
func PrefixClient(prefixes []string, client Client) Client {
return &prefixClient{
Prefixes: prefixes,
Client: client,
}
}
type prefixClient struct {
Prefixes []string
Client Client
}
func (p *prefixClient) BumpAvg(key string, val float64) {
for _, prefix := range p.Prefixes {
p.Client.BumpAvg(prefix+key, val)
}
}
func (p *prefixClient) BumpSum(key string, val float64) {
for _, prefix := range p.Prefixes {
p.Client.BumpSum(prefix+key, val)
}
}
func (p *prefixClient) BumpHistogram(key string, val float64) {
for _, prefix := range p.Prefixes {
p.Client.BumpHistogram(prefix+key, val)
}
}
func (p *prefixClient) BumpTime(key string) interface {
End()
} {
var m multiEnder
for _, prefix := range p.Prefixes {
m = append(m, p.Client.BumpTime(prefix+key))
}
return m
}
// multiEnder combines many enders together.
type multiEnder []interface {
End()
}
func (m multiEnder) End() {
for _, e := range m {
e.End()
}
}
// HookClient is useful for testing. It provides optional hooks for each
// expected method in the interface, which if provided will be called. If a
// hook is not provided, it will be ignored.
type HookClient struct {
BumpAvgHook func(key string, val float64)
BumpSumHook func(key string, val float64)
BumpHistogramHook func(key string, val float64)
BumpTimeHook func(key string) interface {
End()
}
}
// BumpAvg will call BumpAvgHook if defined.
func (c *HookClient) BumpAvg(key string, val float64) {
if c.BumpAvgHook != nil {
c.BumpAvgHook(key, val)
}
}
// BumpSum will call BumpSumHook if defined.
func (c *HookClient) BumpSum(key string, val float64) {
if c.BumpSumHook != nil {
c.BumpSumHook(key, val)
}
}
// BumpHistogram will call BumpHistogramHook if defined.
func (c *HookClient) BumpHistogram(key string, val float64) {
if c.BumpHistogramHook != nil {
c.BumpHistogramHook(key, val)
}
}
// BumpTime will call BumpTimeHook if defined.
func (c *HookClient) BumpTime(key string) interface {
End()
} {
if c.BumpTimeHook != nil {
return c.BumpTimeHook(key)
}
return NoOpEnd
}
type noOpEnd struct{}
func (n noOpEnd) End() {}
// NoOpEnd provides a dummy value for use in tests as valid return value for
// BumpTime().
var NoOpEnd = noOpEnd{}
// BumpAvg calls BumpAvg on the Client if it isn't nil. This is useful when a
// component has an optional stats.Client.
func BumpAvg(c Client, key string, val float64) {
if c != nil {
c.BumpAvg(key, val)
}
}
// BumpSum calls BumpSum on the Client if it isn't nil. This is useful when a
// component has an optional stats.Client.
func BumpSum(c Client, key string, val float64) {
if c != nil {
c.BumpSum(key, val)
}
}
// BumpHistogram calls BumpHistogram on the Client if it isn't nil. This is
// useful when a component has an optional stats.Client.
func BumpHistogram(c Client, key string, val float64) {
if c != nil {
c.BumpHistogram(key, val)
}
}
// BumpTime calls BumpTime on the Client if it isn't nil. If the Client is nil
// it still returns a valid return value which will be a no-op. This is useful
// when a component has an optional stats.Client.
func BumpTime(c Client, key string) interface {
End()
} {
if c != nil {
return c.BumpTime(key)
}
return NoOpEnd
}

View File

@ -0,0 +1,77 @@
package stats_test
import (
"testing"
"github.com/facebookgo/ensure"
"github.com/facebookgo/stats"
)
// Ensure calling End works even when a BumpTimeHook isn't provided.
func TestHookClientBumpTime(t *testing.T) {
(&stats.HookClient{}).BumpTime("foo").End()
}
func TestPrefixClient(t *testing.T) {
const (
prefix1 = "prefix1"
prefix2 = "prefix2"
avgKey = "avg"
avgVal = float64(1)
sumKey = "sum"
sumVal = float64(2)
histogramKey = "histogram"
histogramVal = float64(3)
timeKey = "time"
)
var keys []string
hc := &stats.HookClient{
BumpAvgHook: func(key string, val float64) {
keys = append(keys, key)
ensure.DeepEqual(t, val, avgVal)
},
BumpSumHook: func(key string, val float64) {
keys = append(keys, key)
ensure.DeepEqual(t, val, sumVal)
},
BumpHistogramHook: func(key string, val float64) {
keys = append(keys, key)
ensure.DeepEqual(t, val, histogramVal)
},
BumpTimeHook: func(key string) interface {
End()
} {
return multiEnderTest{
EndHook: func() {
keys = append(keys, key)
},
}
},
}
pc := stats.PrefixClient([]string{prefix1, prefix2}, hc)
pc.BumpAvg(avgKey, avgVal)
pc.BumpSum(sumKey, sumVal)
pc.BumpHistogram(histogramKey, histogramVal)
pc.BumpTime(timeKey).End()
ensure.SameElements(t, keys, []string{
prefix1 + avgKey,
prefix1 + sumKey,
prefix1 + histogramKey,
prefix1 + timeKey,
prefix2 + avgKey,
prefix2 + sumKey,
prefix2 + histogramKey,
prefix2 + timeKey,
})
}
type multiEnderTest struct {
EndHook func()
}
func (e multiEnderTest) End() {
e.EndHook()
}

View File

@ -2,9 +2,7 @@ package main
import (
"os"
"os/signal"
"os/user"
"syscall"
"github.com/minio/cli"
"github.com/minio/minio/pkg/controller"
@ -70,32 +68,13 @@ func getServerConfig(c *cli.Context) api.Config {
}
}
func trapServer(doneCh chan struct{}) {
// Go signal notification works by sending `os.Signal`
// values on a channel.
sigs := make(chan os.Signal, 1)
// `signal.Notify` registers the given channel to
// receive notifications of the specified signals.
signal.Notify(sigs, syscall.SIGHUP, syscall.SIGUSR2)
// This executes a blocking receive for signals.
// When it gets one it'll then notify the program
// that it can finish.
<-sigs
doneCh <- struct{}{}
}
func runServer(c *cli.Context) {
_, err := user.Current()
if err != nil {
Fatalf("Unable to determine current user. Reason: %s\n", err)
}
doneCh := make(chan struct{})
go trapServer(doneCh)
apiServerConfig := getServerConfig(c)
err = server.StartServices(apiServerConfig, doneCh)
err = server.StartServices(apiServerConfig)
if err != nil {
Fatalln(err)
}

View File

@ -85,8 +85,6 @@ func (r *Cache) SetMaxSize(maxSize uint64) {
// Stats get current cache statistics
func (r *Cache) Stats() Stats {
r.Lock()
defer r.Unlock()
return Stats{
Bytes: r.currentSize,
Items: r.items.Len(),

View File

@ -25,7 +25,6 @@ import (
"io"
"io/ioutil"
"log"
"reflect"
"runtime/debug"
"sort"
"strconv"
@ -113,35 +112,21 @@ func New() (Interface, error) {
return nil, iodine.New(err, nil)
}
for k, v := range buckets {
a.storedBuckets.Set(k, v)
var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata)
newBucket.bucketMetadata = v
a.storedBuckets.Set(k, newBucket)
}
}
return a, nil
}
// updateConfig loads new config everytime
func (donut API) updateConfig() {
// on error loading config's just return do not modify
conf, err := LoadConfig()
if err != nil {
return
}
if reflect.DeepEqual(donut.config, conf) {
return
}
if conf.MaxSize == donut.config.MaxSize {
return
}
donut.config = conf
donut.objects.SetMaxSize(conf.MaxSize)
}
// GetObject - GET object from cache buffer
func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
@ -188,8 +173,6 @@ func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, er
func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
errParams := map[string]string{
"bucket": bucket,
@ -246,8 +229,6 @@ func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, len
func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
@ -271,8 +252,6 @@ func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) {
func (donut API) SetBucketMetadata(bucket string, metadata map[string]string) error {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
return iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
@ -314,8 +293,6 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, data io.Reader, metadata map[string]string) (ObjectMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
contentType := metadata["contentType"]
objectMetadata, err := donut.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
@ -429,8 +406,6 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
func (donut API) MakeBucket(bucketName, acl string) error {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if donut.storedBuckets.Stats().Items == totalBuckets {
return iodine.New(TooManyBuckets{Bucket: bucketName}, nil)
@ -470,8 +445,6 @@ func (donut API) MakeBucket(bucketName, acl string) error {
func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
@ -566,8 +539,6 @@ func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
func (donut API) ListBuckets() ([]BucketMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
var results []BucketMetadata
if len(donut.config.NodeDiskMap) > 0 {
@ -592,8 +563,6 @@ func (donut API) ListBuckets() ([]BucketMetadata, error) {
func (donut API) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
// check if bucket exists
if !IsValidBucket(bucket) {

View File

@ -22,6 +22,8 @@ import (
"encoding/base64"
"encoding/hex"
"io/ioutil"
"os"
"path/filepath"
"testing"
. "github.com/minio/check"
@ -29,14 +31,20 @@ import (
func TestCache(t *testing.T) { TestingT(t) }
type MyCacheSuite struct{}
type MyCacheSuite struct {
root string
}
var _ = Suite(&MyCacheSuite{})
var dc Interface
func (s *MyCacheSuite) SetUpSuite(c *C) {
var err error
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
s.root = root
customConfigPath = filepath.Join(root, "donut.json")
dc, err = New()
c.Assert(err, IsNil)
@ -46,6 +54,10 @@ func (s *MyCacheSuite) SetUpSuite(c *C) {
c.Assert(len(buckets), Equals, 0)
}
func (s *MyCacheSuite) TearDownSuite(c *C) {
os.RemoveAll(s.root)
}
// test make bucket without name
func (s *MyCacheSuite) TestBucketWithoutNameFails(c *C) {
// fail to create new bucket without a name

View File

@ -38,8 +38,6 @@ import (
func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
@ -72,8 +70,6 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
return iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
@ -93,8 +89,6 @@ func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error {
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
// possible free
@ -201,8 +195,6 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) {
donut.lock.Lock()
// update Config if possible
donut.updateConfig()
if !IsValidBucket(bucket) {
donut.lock.Unlock()
@ -282,8 +274,6 @@ func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartRe
// TODO handle delimiter
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !donut.storedBuckets.Exists(bucket) {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
@ -347,8 +337,6 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
// Verify upload id
donut.lock.Lock()
defer donut.lock.Unlock()
// update Config if possible
donut.updateConfig()
if !donut.storedBuckets.Exists(bucket) {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)

View File

@ -0,0 +1,30 @@
BSD License
For grace software
Copyright (c) 2015, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

151
pkg/server/nimble/http.go Normal file
View File

@ -0,0 +1,151 @@
// Package nimble provides easy to use graceful restart for a set of HTTP services
//
// This package originally from https://github.com/facebookgo/grace
//
// Re-licensing with Apache License 2.0, with code modifications
package nimble
import (
"crypto/tls"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"github.com/facebookgo/httpdown"
"github.com/minio/minio/pkg/iodine"
)
var (
inheritedListeners = os.Getenv("LISTEN_FDS")
ppid = os.Getppid()
)
// An app contains one or more servers and associated configuration.
type app struct {
servers []*http.Server
net *nimbleNet
listeners []net.Listener
sds []httpdown.Server
errors chan error
}
func newApp(servers []*http.Server) *app {
return &app{
servers: servers,
net: &nimbleNet{},
listeners: make([]net.Listener, 0, len(servers)),
sds: make([]httpdown.Server, 0, len(servers)),
errors: make(chan error, 1+(len(servers)*2)),
}
}
func (a *app) listen() error {
for _, s := range a.servers {
l, err := a.net.Listen("tcp", s.Addr)
if err != nil {
return iodine.New(err, nil)
}
if s.TLSConfig != nil {
l = tls.NewListener(l, s.TLSConfig)
}
a.listeners = append(a.listeners, l)
}
return nil
}
func (a *app) serve() {
h := &httpdown.HTTP{}
for i, s := range a.servers {
a.sds = append(a.sds, h.Serve(s, a.listeners[i]))
}
}
func (a *app) wait() {
var wg sync.WaitGroup
wg.Add(len(a.sds) * 2) // Wait & Stop
go a.signalHandler(&wg)
for _, s := range a.sds {
go func(s httpdown.Server) {
defer wg.Done()
if err := s.Wait(); err != nil {
a.errors <- iodine.New(err, nil)
}
}(s)
}
wg.Wait()
}
func (a *app) term(wg *sync.WaitGroup) {
for _, s := range a.sds {
go func(s httpdown.Server) {
defer wg.Done()
if err := s.Stop(); err != nil {
a.errors <- iodine.New(err, nil)
}
}(s)
}
}
func (a *app) signalHandler(wg *sync.WaitGroup) {
ch := make(chan os.Signal, 10)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGHUP)
for {
sig := <-ch
switch sig {
case syscall.SIGTERM:
// this ensures a subsequent TERM will trigger standard go behaviour of
// terminating.
signal.Stop(ch)
a.term(wg)
return
case syscall.SIGUSR2:
fallthrough
case syscall.SIGHUP:
// we only return here if there's an error, otherwise the new process
// will send us a TERM when it's ready to trigger the actual shutdown.
if _, err := a.net.StartProcess(); err != nil {
a.errors <- iodine.New(err, nil)
}
}
}
}
// ListenAndServe will serve the given http.Servers and will monitor for signals
// allowing for graceful termination (SIGTERM) or restart (SIGUSR2/SIGHUP).
func ListenAndServe(servers ...*http.Server) error {
a := newApp(servers)
// Acquire Listeners
if err := a.listen(); err != nil {
return iodine.New(err, nil)
}
// Start serving.
a.serve()
// Close the parent if we inherited and it wasn't init that started us.
if inheritedListeners != "" && ppid != 1 {
if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil {
return iodine.New(err, nil)
}
}
waitdone := make(chan struct{})
go func() {
defer close(waitdone)
a.wait()
}()
select {
case err := <-a.errors:
if err == nil {
panic("unexpected nil error")
}
return iodine.New(err, nil)
case <-waitdone:
return nil
}
}

248
pkg/server/nimble/net.go Normal file
View File

@ -0,0 +1,248 @@
package nimble
import (
"fmt"
"net"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"github.com/minio/minio/pkg/iodine"
)
// This package originally from https://github.com/facebookgo/grace
//
// Re-licensing with Apache License 2.0, with code modifications
// This package provides a family of Listen functions that either open a
// fresh connection or provide an inherited connection from when the process
// was started. This behaves like their counterparts in the net pacakge, but
// transparently provide support for graceful restarts without dropping
// connections. This is provided in a systemd socket activation compatible form
// to allow using socket activation.
//
const (
// Used to indicate a graceful restart in the new process.
envCountKey = "LISTEN_FDS" // similar to systemd SDS_LISTEN_FDS
envCountKeyPrefix = envCountKey + "="
)
// In order to keep the working directory the same as when we started we record
// it at startup.
var originalWD, _ = os.Getwd()
// nimbleNet provides the family of Listen functions and maintains the associated
// state. Typically you will have only once instance of nimbleNet per application.
type nimbleNet struct {
inherited []net.Listener
active []net.Listener
mutex sync.Mutex
inheritOnce sync.Once
}
func (n *nimbleNet) inherit() error {
var retErr error
n.inheritOnce.Do(func() {
n.mutex.Lock()
defer n.mutex.Unlock()
countStr := os.Getenv(envCountKey)
if countStr == "" {
return
}
count, err := strconv.Atoi(countStr)
if err != nil {
retErr = fmt.Errorf("found invalid count value: %s=%s", envCountKey, countStr)
return
}
fdStart := 3
for i := fdStart; i < fdStart+count; i++ {
file := os.NewFile(uintptr(i), "listener")
l, err := net.FileListener(file)
if err != nil {
file.Close()
retErr = fmt.Errorf("error inheriting socket fd %d: %s", i, err)
return
}
if err := file.Close(); err != nil {
retErr = fmt.Errorf("error closing inherited socket fd %d: %s", i, err)
return
}
n.inherited = append(n.inherited, l)
}
})
return iodine.New(retErr, nil)
}
// Listen announces on the local network address laddr. The network net must be
// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It
// returns an inherited net.Listener for the matching network and address, or
// creates a new one using net.Listen.
func (n *nimbleNet) Listen(nett, laddr string) (net.Listener, error) {
switch nett {
default:
return nil, net.UnknownNetworkError(nett)
case "tcp", "tcp4", "tcp6":
addr, err := net.ResolveTCPAddr(nett, laddr)
if err != nil {
return nil, iodine.New(err, nil)
}
return n.ListenTCP(nett, addr)
case "unix", "unixpacket", "invalid_unix_net_for_test":
addr, err := net.ResolveUnixAddr(nett, laddr)
if err != nil {
return nil, iodine.New(err, nil)
}
return n.ListenUnix(nett, addr)
}
}
// ListenTCP announces on the local network address laddr. The network net must
// be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the
// matching network and address, or creates a new one using net.ListenTCP.
func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) {
if err := n.inherit(); err != nil {
return nil, iodine.New(err, nil)
}
n.mutex.Lock()
defer n.mutex.Unlock()
// look for an inherited listener
for i, l := range n.inherited {
if l == nil { // we nil used inherited listeners
continue
}
if isSameAddr(l.Addr(), laddr) {
n.inherited[i] = nil
n.active = append(n.active, l)
return l.(*net.TCPListener), nil
}
}
// make a fresh listener
l, err := net.ListenTCP(nett, laddr)
if err != nil {
return nil, iodine.New(err, nil)
}
n.active = append(n.active, l)
return l, nil
}
// ListenUnix announces on the local network address laddr. The network net
// must be a: "unix" or "unixpacket". It returns an inherited net.Listener for
// the matching network and address, or creates a new one using net.ListenUnix.
func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) {
if err := n.inherit(); err != nil {
return nil, iodine.New(err, nil)
}
n.mutex.Lock()
defer n.mutex.Unlock()
// look for an inherited listener
for i, l := range n.inherited {
if l == nil { // we nil used inherited listeners
continue
}
if isSameAddr(l.Addr(), laddr) {
n.inherited[i] = nil
n.active = append(n.active, l)
return l.(*net.UnixListener), nil
}
}
// make a fresh listener
l, err := net.ListenUnix(nett, laddr)
if err != nil {
return nil, iodine.New(err, nil)
}
n.active = append(n.active, l)
return l, nil
}
// activeListeners returns a snapshot copy of the active listeners.
func (n *nimbleNet) activeListeners() ([]net.Listener, error) {
n.mutex.Lock()
defer n.mutex.Unlock()
ls := make([]net.Listener, len(n.active))
copy(ls, n.active)
return ls, nil
}
func isSameAddr(a1, a2 net.Addr) bool {
if a1.Network() != a2.Network() {
return false
}
a1s := a1.String()
a2s := a2.String()
if a1s == a2s {
return true
}
// This allows for ipv6 vs ipv4 local addresses to compare as equal. This
// scenario is common when listening on localhost.
const ipv6prefix = "[::]"
a1s = strings.TrimPrefix(a1s, ipv6prefix)
a2s = strings.TrimPrefix(a2s, ipv6prefix)
const ipv4prefix = "0.0.0.0"
a1s = strings.TrimPrefix(a1s, ipv4prefix)
a2s = strings.TrimPrefix(a2s, ipv4prefix)
return a1s == a2s
}
// StartProcess starts a new process passing it the active listeners. It
// doesn't fork, but starts a new process using the same environment and
// arguments as when it was originally started. This allows for a newly
// deployed binary to be started. It returns the pid of the newly started
// process when successful.
func (n *nimbleNet) StartProcess() (int, error) {
listeners, err := n.activeListeners()
if err != nil {
return 0, iodine.New(err, nil)
}
// Extract the fds from the listeners.
files := make([]*os.File, len(listeners))
for i, l := range listeners {
files[i], err = l.(fileListener).File()
if err != nil {
return 0, iodine.New(err, nil)
}
defer files[i].Close()
}
// Use the original binary location. This works with symlinks such that if
// the file it points to has been changed we will use the updated symlink.
argv0, err := exec.LookPath(os.Args[0])
if err != nil {
return 0, iodine.New(err, nil)
}
// Pass on the environment and replace the old count key with the new one.
var env []string
for _, v := range os.Environ() {
if !strings.HasPrefix(v, envCountKeyPrefix) {
env = append(env, v)
}
}
env = append(env, fmt.Sprintf("%s%d", envCountKeyPrefix, len(listeners)))
allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...)
process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{
Dir: originalWD,
Env: env,
Files: allFiles,
})
if err != nil {
return 0, iodine.New(err, nil)
}
return process.Pid, nil
}
type fileListener interface {
File() (*os.File, error)
}

View File

@ -21,16 +21,16 @@ import (
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/server/api"
"github.com/minio/minio/pkg/server/nimble"
)
// Start API listener
func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
defer close(errCh)
// getAPI server instance
func getAPIServer(conf api.Config, apiHandler http.Handler) (*http.Server, error) {
// Minio server config
httpServer := &http.Server{
Addr: conf.Address,
@ -48,14 +48,13 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(conf.CertFile, conf.KeyFile)
if err != nil {
errCh <- iodine.New(err, nil)
return nil, iodine.New(err, nil)
}
}
host, port, err := net.SplitHostPort(conf.Address)
if err != nil {
errCh <- iodine.New(err, nil)
return
return nil, iodine.New(err, nil)
}
var hosts []string
@ -65,8 +64,7 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
default:
addrs, err := net.InterfaceAddrs()
if err != nil {
errCh <- iodine.New(err, nil)
return
return nil, iodine.New(err, nil)
}
for _, addr := range addrs {
if addr.Network() == "ip+net" {
@ -80,26 +78,24 @@ func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
for _, host := range hosts {
if conf.TLS {
fmt.Printf("Starting minio server on: https://%s:%s\n", host, port)
fmt.Printf("Starting minio server on: https://%s:%s, PID: %d\n", host, port, os.Getpid())
} else {
fmt.Printf("Starting minio server on: http://%s:%s\n", host, port)
fmt.Printf("Starting minio server on: http://%s:%s, PID: %d\n", host, port, os.Getpid())
}
}
errCh <- httpServer.ListenAndServe()
return httpServer, nil
}
// Start RPC listener
func startRPC(errCh chan error, rpcHandler http.Handler) {
defer close(errCh)
// getRPCServer instance
func getRPCServer(rpcHandler http.Handler) *http.Server {
// Minio server config
httpServer := &http.Server{
Addr: "127.0.0.1:9001", // TODO make this configurable
Handler: rpcHandler,
MaxHeaderBytes: 1 << 20,
}
errCh <- httpServer.ListenAndServe()
return httpServer
}
// Start ticket master
@ -112,21 +108,18 @@ func startTM(a api.Minio) {
}
// StartServices starts basic services for a server
func StartServices(conf api.Config, doneCh chan struct{}) error {
apiErrCh := make(chan error)
rpcErrCh := make(chan error)
func StartServices(conf api.Config) error {
apiHandler, minioAPI := getAPIHandler(conf)
go startAPI(apiErrCh, conf, apiHandler)
go startRPC(rpcErrCh, getRPCHandler())
apiServer, err := getAPIServer(conf, apiHandler)
if err != nil {
return iodine.New(err, nil)
}
rpcServer := getRPCServer(getRPCHandler())
// start ticket master
go startTM(minioAPI)
select {
case err := <-apiErrCh:
if err := nimble.ListenAndServe(apiServer, rpcServer); err != nil {
return iodine.New(err, nil)
case err := <-rpcErrCh:
return iodine.New(err, nil)
case <-doneCh:
return nil
}
return nil
}