From fe6d3ad64912bac11f199501b8e7904dab0c1826 Mon Sep 17 00:00:00 2001 From: Fernando Barillas Date: Sat, 27 Apr 2019 14:57:21 -0700 Subject: [PATCH] Disable telemetry --- telemetry/collection.go | 178 +------------------------------- telemetry/telemetry.go | 222 +--------------------------------------- 2 files changed, 7 insertions(+), 393 deletions(-) diff --git a/telemetry/collection.go b/telemetry/collection.go index 8b05d17d..f05672f6 100644 --- a/telemetry/collection.go +++ b/telemetry/collection.go @@ -15,11 +15,6 @@ package telemetry import ( - "fmt" - "hash/fnv" - "log" - "strings" - "github.com/google/uuid" ) @@ -36,20 +31,6 @@ import ( // argument will be permanently disabled for the // lifetime of the process. func Init(instanceID uuid.UUID, disabledMetricsKeys []string) { - if enabled { - panic("already initialized") - } - if str := instanceID.String(); str == "" || - str == "00000000-0000-0000-0000-000000000000" { - panic("empty UUID") - } - instanceUUID = instanceID - disabledMetricsMu.Lock() - for _, key := range disabledMetricsKeys { - disabledMetrics[strings.TrimSpace(key)] = false - } - disabledMetricsMu.Unlock() - enabled = true } // StartEmitting sends the current payload and begins the @@ -62,22 +43,6 @@ func Init(instanceID uuid.UUID, disabledMetricsKeys []string) { // This function panics if it was called more than once. // It is a no-op if this package was not initialized. func StartEmitting() { - if !enabled { - return - } - updateTimerMu.Lock() - if updateTimer != nil { - updateTimerMu.Unlock() - panic("updates already started") - } - updateTimerMu.Unlock() - updateMu.Lock() - if updating { - updateMu.Unlock() - panic("update already in progress") - } - updateMu.Unlock() - go logEmit(false) } // StopEmitting sends the current payload and terminates @@ -90,21 +55,10 @@ func StartEmitting() { // you want to guarantee no blocking at critical times // like exiting the program. func StopEmitting() { - if !enabled { - return - } - updateTimerMu.Lock() - if updateTimer == nil { - updateTimerMu.Unlock() - return - } - updateTimerMu.Unlock() - logEmit(true) // likely too early; may take minutes to return } // Reset empties the current payload buffer. func Reset() { - resetBuffer() } // Set puts a value in the buffer to be included @@ -116,19 +70,6 @@ func Reset() { // go keyword after the call to SendHello so it // doesn't block crucial code. func Set(key string, val interface{}) { - if !enabled || isDisabled(key) { - return - } - bufferMu.Lock() - if _, ok := buffer[key]; !ok { - if bufferItemCount >= maxBufferItems { - bufferMu.Unlock() - return - } - bufferItemCount++ - } - buffer[key] = val - bufferMu.Unlock() } // SetNested puts a value in the buffer to be included @@ -140,36 +81,6 @@ func Set(key string, val interface{}) { // go keyword after the call to SendHello so it // doesn't block crucial code. func SetNested(key, subkey string, val interface{}) { - if !enabled || isDisabled(key) { - return - } - bufferMu.Lock() - if topLevel, ok1 := buffer[key]; ok1 { - topLevelMap, ok2 := topLevel.(map[string]interface{}) - if !ok2 { - bufferMu.Unlock() - log.Printf("[PANIC] Telemetry: key %s is already used for non-nested-map value", key) - return - } - if _, ok3 := topLevelMap[subkey]; !ok3 { - // don't exceed max buffer size - if bufferItemCount >= maxBufferItems { - bufferMu.Unlock() - return - } - bufferItemCount++ - } - topLevelMap[subkey] = val - } else { - // don't exceed max buffer size - if bufferItemCount >= maxBufferItems { - bufferMu.Unlock() - return - } - bufferItemCount++ - buffer[key] = map[string]interface{}{subkey: val} - } - bufferMu.Unlock() } // Append appends value to a list named key. @@ -177,29 +88,6 @@ func SetNested(key, subkey string, val interface{}) { // If key maps to a type that is not a list, // a panic is logged, and this is a no-op. func Append(key string, value interface{}) { - if !enabled || isDisabled(key) { - return - } - bufferMu.Lock() - if bufferItemCount >= maxBufferItems { - bufferMu.Unlock() - return - } - // TODO: Test this... - bufVal, inBuffer := buffer[key] - sliceVal, sliceOk := bufVal.([]interface{}) - if inBuffer && !sliceOk { - bufferMu.Unlock() - log.Printf("[PANIC] Telemetry: key %s already used for non-slice value", key) - return - } - if sliceVal == nil { - buffer[key] = []interface{}{value} - } else if sliceOk { - buffer[key] = append(sliceVal, value) - } - bufferItemCount++ - bufferMu.Unlock() } // AppendUnique adds value to a set named key. @@ -213,30 +101,6 @@ func Append(key string, value interface{}) { // that is not a counting set, a panic is logged, // and this is a no-op. func AppendUnique(key string, value interface{}) { - if !enabled || isDisabled(key) { - return - } - bufferMu.Lock() - bufVal, inBuffer := buffer[key] - setVal, setOk := bufVal.(countingSet) - if inBuffer && !setOk { - bufferMu.Unlock() - log.Printf("[PANIC] Telemetry: key %s already used for non-counting-set value", key) - return - } - if setVal == nil { - // ensure the buffer is not too full, then add new unique value - if bufferItemCount >= maxBufferItems { - bufferMu.Unlock() - return - } - buffer[key] = countingSet{value: 1} - bufferItemCount++ - } else if setOk { - // unique value already exists, so just increment counter - setVal[value]++ - } - bufferMu.Unlock() } // Add adds amount to a value named key. @@ -245,49 +109,22 @@ func AppendUnique(key string, value interface{}) { // is not an integer, a panic is logged, // and this is a no-op. func Add(key string, amount int) { - atomicAdd(key, amount) } // Increment is a shortcut for Add(key, 1) func Increment(key string) { - atomicAdd(key, 1) } // atomicAdd adds amount (negative to subtract) // to key. func atomicAdd(key string, amount int) { - if !enabled || isDisabled(key) { - return - } - bufferMu.Lock() - bufVal, inBuffer := buffer[key] - intVal, intOk := bufVal.(int) - if inBuffer && !intOk { - bufferMu.Unlock() - log.Printf("[PANIC] Telemetry: key %s already used for non-integer value", key) - return - } - if !inBuffer { - if bufferItemCount >= maxBufferItems { - bufferMu.Unlock() - return - } - bufferItemCount++ - } - buffer[key] = intVal + amount - bufferMu.Unlock() } // FastHash hashes input using a 32-bit hashing algorithm // that is fast, and returns the hash as a hex-encoded string. // Do not use this for cryptographic purposes. func FastHash(input []byte) string { - h := fnv.New32a() - if _, err := h.Write(input); err != nil { - log.Println("[ERROR] failed to write bytes: ", err) - } - - return fmt.Sprintf("%x", h.Sum32()) + return "" } // isDisabled returns whether key is @@ -295,16 +132,5 @@ func FastHash(input []byte) string { // functions should call this and not // save the value if this returns true. func isDisabled(key string) bool { - // for keys that are augmented with data, such as - // "tls_client_hello_ua:", just - // check the prefix "tls_client_hello_ua" - checkKey := key - if idx := strings.Index(key, ":"); idx > -1 { - checkKey = key[:idx] - } - - disabledMetricsMu.RLock() - _, ok := disabledMetrics[checkKey] - disabledMetricsMu.RUnlock() - return ok + return true } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 2aab30b8..165c8618 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -33,16 +33,7 @@ package telemetry import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "log" - "math/rand" "net/http" - "runtime" - "strconv" - "strings" "sync" "time" @@ -52,10 +43,6 @@ import ( // logEmit calls emit and then logs the error, if any. // See docs for emit. func logEmit(final bool) { - err := emit(final) - if err != nil { - log.Printf("[ERROR] Sending telemetry: %v", err) - } } // emit sends an update to the telemetry server. @@ -63,184 +50,15 @@ func logEmit(final bool) { // If final is true, no future updates will be scheduled. // Otherwise, the next update will be scheduled. func emit(final bool) error { - if !enabled { - return fmt.Errorf("telemetry not enabled") - } - - // some metrics are updated/set at time of emission - setEmitTimeMetrics() - - // ensure only one update happens at a time; - // skip update if previous one still in progress - updateMu.Lock() - if updating { - updateMu.Unlock() - log.Println("[NOTICE] Skipping this telemetry update because previous one is still working") - return nil - } - updating = true - updateMu.Unlock() - defer func() { - updateMu.Lock() - updating = false - updateMu.Unlock() - }() - - // terminate any pending update if this is the last one - if final { - stopUpdateTimer() - } - - payloadBytes, err := makePayloadAndResetBuffer() - if err != nil { - return err - } - - // this will hold the server's reply - var reply Response - - // transmit the payload - use a loop to retry in case of failure - for i := 0; i < 4; i++ { - if i > 0 && err != nil { - // don't hammer the server; first failure might have been - // a fluke, but back off more after that - log.Printf("[WARNING] Sending telemetry (attempt %d): %v - backing off and retrying", i, err) - time.Sleep(time.Duration((i+1)*(i+1)*(i+1)) * time.Second) - } - - // send it - var resp *http.Response - resp, err = httpClient.Post(endpoint+instanceUUID.String(), "application/json", bytes.NewReader(payloadBytes)) - if err != nil { - continue - } - - // check for any special-case response codes - if resp.StatusCode == http.StatusGone { - // the endpoint has been deprecated and is no longer servicing clients - err = fmt.Errorf("telemetry server replied with HTTP %d; upgrade required", resp.StatusCode) - if clen := resp.Header.Get("Content-Length"); clen != "0" && clen != "" { - bodyBytes, readErr := ioutil.ReadAll(resp.Body) - if readErr != nil { - log.Printf("[ERROR] Reading response body from server: %v", readErr) - } - err = fmt.Errorf("%v - %s", err, bodyBytes) - } - resp.Body.Close() - reply.Stop = true - break - } - if resp.StatusCode == http.StatusUnavailableForLegalReasons { - // the endpoint is unavailable, at least to this client, for legal reasons (!) - err = fmt.Errorf("telemetry server replied with HTTP %d %s: please consult the project website and developers for guidance", resp.StatusCode, resp.Status) - if clen := resp.Header.Get("Content-Length"); clen != "0" && clen != "" { - bodyBytes, readErr := ioutil.ReadAll(resp.Body) - if readErr != nil { - log.Printf("[ERROR] Reading response body from server: %v", readErr) - } - err = fmt.Errorf("%v - %s", err, bodyBytes) - } - resp.Body.Close() - reply.Stop = true - break - } - - // okay, ensure we can interpret the response - if ct := resp.Header.Get("Content-Type"); (resp.StatusCode < 300 || resp.StatusCode >= 400) && - !strings.Contains(ct, "json") { - err = fmt.Errorf("telemetry server replied with unknown content-type: '%s' and HTTP %s", ct, resp.Status) - resp.Body.Close() - continue - } - - // read the response body - err = json.NewDecoder(resp.Body).Decode(&reply) - resp.Body.Close() // close response body as soon as we're done with it - if err != nil { - continue - } - - // update the list of enabled/disabled keys, if any - for _, key := range reply.EnableKeys { - disabledMetricsMu.Lock() - // only re-enable this metric if it is temporarily disabled - if temp, ok := disabledMetrics[key]; ok && temp { - delete(disabledMetrics, key) - } - disabledMetricsMu.Unlock() - } - for _, key := range reply.DisableKeys { - disabledMetricsMu.Lock() - disabledMetrics[key] = true // all remotely-disabled keys are "temporarily" disabled - disabledMetricsMu.Unlock() - } - - // make sure we didn't send the update too soon; if so, - // just wait and try again -- this is a special case of - // error that we handle differently, as you can see - if resp.StatusCode == http.StatusTooManyRequests { - if reply.NextUpdate <= 0 { - raStr := resp.Header.Get("Retry-After") - if ra, err := strconv.Atoi(raStr); err == nil { - reply.NextUpdate = time.Duration(ra) * time.Second - } - } - if !final { - log.Printf("[NOTICE] Sending telemetry: we were too early; waiting %s before trying again", reply.NextUpdate) - time.Sleep(reply.NextUpdate) - continue - } - } else if resp.StatusCode >= 400 { - err = fmt.Errorf("telemetry server returned status code %d", resp.StatusCode) - continue - } - - break - } - if err == nil && !final { - // (remember, if there was an error, we return it - // below, so it WILL get logged if it's supposed to) - log.Println("[INFO] Sending telemetry: success") - } - - // even if there was an error after all retries, we should - // schedule the next update using our default update - // interval because the server might be healthy later - - // ensure we won't slam the telemetry server; add a little variance - if reply.NextUpdate < 1*time.Second { - reply.NextUpdate = defaultUpdateInterval + time.Duration(rand.Int63n(int64(1*time.Minute))) - } - - // schedule the next update (if this wasn't the last one and - // if the remote server didn't tell us to stop sending) - if !final && !reply.Stop { - updateTimerMu.Lock() - updateTimer = time.AfterFunc(reply.NextUpdate, func() { - logEmit(false) - }) - updateTimerMu.Unlock() - } - - return err + return nil } func stopUpdateTimer() { - updateTimerMu.Lock() - updateTimer.Stop() - updateTimer = nil - updateTimerMu.Unlock() } // setEmitTimeMetrics sets some metrics that should // be recorded just before emitting. func setEmitTimeMetrics() { - Set("goroutines", runtime.NumGoroutine()) - - var mem runtime.MemStats - runtime.ReadMemStats(&mem) - SetNested("memory", "heap_alloc", mem.HeapAlloc) - SetNested("memory", "sys", mem.Sys) } // makePayloadAndResetBuffer prepares a payload @@ -250,15 +68,7 @@ func setEmitTimeMetrics() { // resulting byte slice is lost, the payload is // gone with it. func makePayloadAndResetBuffer() ([]byte, error) { - bufCopy := resetBuffer() - - // encode payload in preparation for transmission - payload := Payload{ - InstanceID: instanceUUID.String(), - Timestamp: time.Now().UTC(), - Data: bufCopy, - } - return json.Marshal(payload) + return make([]byte, 0), nil } // resetBuffer makes a local pointer to the buffer, @@ -268,12 +78,7 @@ func makePayloadAndResetBuffer() ([]byte, error) { // the original map so the old buffer value can be // used locally. func resetBuffer() map[string]interface{} { - bufferMu.Lock() - bufCopy := buffer - buffer = make(map[string]interface{}) - bufferItemCount = 0 - bufferMu.Unlock() - return bufCopy + return make(map[string]interface{}) } // Response contains the body of a response from the @@ -324,13 +129,6 @@ type Payload struct { // Int returns the value of the data keyed by key // if it is an integer; otherwise it returns 0. func (p Payload) Int(key string) int { - val, _ := p.Data[key] - switch p.Data[key].(type) { - case int: - return val.(int) - case float64: // after JSON-decoding, int becomes float64... - return int(val.(float64)) - } return 0 } @@ -345,17 +143,7 @@ type countingSet map[interface{}]int // are JSON object values instead of keys, since keys // are difficult to query in databases. func (s countingSet) MarshalJSON() ([]byte, error) { - type Item struct { - Value interface{} `json:"value"` - Count int `json:"count"` - } - var list []Item - - for k, v := range s { - list = append(list, Item{Value: k, Count: v}) - } - - return json.Marshal(list) + return make([]byte, 0), nil } var ( @@ -415,7 +203,7 @@ var ( const ( // endpoint is the base URL to remote telemetry server; // the instance ID will be appended to it. - endpoint = "https://telemetry.caddyserver.com/v1/update/" + endpoint = "" // defaultUpdateInterval is how long to wait before emitting // more telemetry data if all retires fail. This value is -- 2.21.0