From 60b77330042649b189f170ab1ebe2f70a9085aea Mon Sep 17 00:00:00 2001 From: "zhiqiang.shi" Date: Thu, 28 Jan 2021 17:12:36 +0800 Subject: [PATCH 1/2] Nginx Lua agent 0.1.0 support --- .gitignore | 1 + README.md | 3 +- ...=> kong-plugin-skywalking-0.1.0-1.rockspec | 5 +- kong/plugins/skywalking/client.lua | 96 +++++++++++---- .../skywalking/correlation_context.lua | 115 ------------------ kong/plugins/skywalking/management.lua | 45 +++++-- kong/plugins/skywalking/segment_ref.lua | 49 ++++++-- kong/plugins/skywalking/span.lua | 6 +- kong/plugins/skywalking/tracer.lua | 12 +- kong/plugins/skywalking/tracing_context.lua | 28 +---- 10 files changed, 170 insertions(+), 190 deletions(-) create mode 100644 .gitignore rename kong-plugin-skywalking-0.2.0-1.rockspec => kong-plugin-skywalking-0.1.0-1.rockspec (90%) delete mode 100644 kong/plugins/skywalking/correlation_context.lua diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e43b0f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/README.md b/README.md index dcceb89..194559c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # kong-plugin-skywalking -The Nginx Lua agent 0.2.0 for Apache SkyWalking 8 kong-plugin + +The Nginx Lua agent 0.1.0 for Apache SkyWalking 8 kong-plugin diff --git a/kong-plugin-skywalking-0.2.0-1.rockspec b/kong-plugin-skywalking-0.1.0-1.rockspec similarity index 90% rename from kong-plugin-skywalking-0.2.0-1.rockspec rename to kong-plugin-skywalking-0.1.0-1.rockspec index 44a4a0b..df8294e 100644 --- a/kong-plugin-skywalking-0.2.0-1.rockspec +++ b/kong-plugin-skywalking-0.1.0-1.rockspec @@ -1,13 +1,13 @@ package = "kong-plugin-skywalking" -version = "0.2.0-1" +version = "0.1.0-1" local pluginName = "skywalking" supported_platforms = {"linux", "macosx"} source = { url = "https://github.com/polaris-liu/kong-plugin-skywalking.git", - tag = "0.2.0" + tag = "0.1.0" } description = { @@ -26,7 +26,6 @@ build = { ["kong.plugins."..pluginName..".handler"] = "kong/plugins/"..pluginName.."/handler.lua", ["kong.plugins."..pluginName..".schema"] = "kong/plugins/"..pluginName.."/schema.lua", ["kong.plugins."..pluginName..".client"] = "kong/plugins/"..pluginName.."/client.lua", - ["kong.plugins."..pluginName..".correlation_context"] = "kong/plugins/"..pluginName.."/correlation_context.lua", ["kong.plugins."..pluginName..".management"] = "kong/plugins/"..pluginName.."/management.lua", ["kong.plugins."..pluginName..".segment"] = "kong/plugins/"..pluginName.."/segment.lua", ["kong.plugins."..pluginName..".segment_ref"] = "kong/plugins/"..pluginName.."/segment_ref.lua", diff --git a/kong/plugins/skywalking/client.lua b/kong/plugins/skywalking/client.lua index 877b3b7..f1ff68f 100644 --- a/kong/plugins/skywalking/client.lua +++ b/kong/plugins/skywalking/client.lua @@ -15,6 +15,8 @@ -- limitations under the License. -- +local timestamp = require "kong.tools.timestamp" + local SEGMENT_BATCH_COUNT = 100 local Client = {} @@ -37,14 +39,25 @@ function Client:startBackendTimer(config) check = function(premature) if not premature then - local instancePropertiesSubmitted = metadata_buffer:get('sw_instancePropertiesSubmitted') - if (instancePropertiesSubmitted == nil or instancePropertiesSubmitted == false) then - self:reportServiceInstance(metadata_buffer, config) - else - self:ping(metadata_buffer, config) + local serviceId = metadata_buffer:get('serviceId') + if (serviceId == nil or serviceId == 0) then + self:registerService(metadata_buffer, config) + end + + serviceId = metadata_buffer:get('serviceId') + if (serviceId ~= nil and serviceId ~= 0) then + local serviceInstId = metadata_buffer:get('serviceInstId') + if (serviceInstId == nil or serviceInstId == 0) then + self:registerServiceInstance(metadata_buffer, config) + end end - self:reportTraces(metadata_buffer, config) + -- After all register successfully, begin to send trace segments + local serviceInstId = metadata_buffer:get('serviceInstId') + if (serviceInstId ~= nil and serviceInstId ~= 0) then + self:reportTraces(metadata_buffer, config) + self:ping(metadata_buffer, config) + end -- do the health check local ok, err = new_timer(delay, check) @@ -66,13 +79,55 @@ function Client:startBackendTimer(config) end end -function Client:reportServiceInstance(metadata_buffer, config) +-- Register service +function Client:registerService(metadata_buffer, config) - local service_name = config.service_name - local service_instance_name = config.service_instance_name + local serviceName = config.service_name + + local cjson = require('cjson') + local serviceRegister = require("kong.plugins.skywalking.management").newServiceRegister(serviceName) + local serviceRegisterParam = cjson.encode(serviceRegister) + + local http = require('resty.http') + local httpc = http.new() + local res, err = httpc:request_uri(config.backend_http_uri .. '/v2/service/register', { + method = "POST", + body = serviceRegisterParam, + headers = { + ["Content-Type"] = "application/json", + }, + }) + + if not res then + log(ERR, "Service register fails, " .. err) + elseif res.status == 200 then + log(DEBUG, "Service register response = " .. res.body) + local registerResults = cjson.decode(res.body) + + for i, result in ipairs(registerResults) + do + if result.key == serviceName then + local serviceId = result.value + log(DEBUG, "Service registered, service id = " .. serviceId) + metadata_buffer:set('serviceId', serviceId) + end + end + else + log(ERR, "Service register fails, response code " .. res.status) + end +end + +function Client:registerServiceInstance(metadata_buffer, config) + + local service_id = metadata_buffer:get("serviceId") + local serviceInstName = 'NAME:' .. config.service_instance_name + metadata_buffer:set('serviceInstanceUUID', serviceInstName) local cjson = require('cjson') - local reportInstance = require("kong.plugins.skywalking.management").newReportInstanceProperties(service_name, service_instance_name) + local reportInstance = require("kong.plugins.skywalking.management").newServiceInstanceRegister( + service_id, + serviceInstName, + timestamp.get_utc()) local reportInstanceParam, err = cjson.encode(reportInstance) if err then log.err("Request to report instance fails, ", err) @@ -81,7 +136,7 @@ function Client:reportServiceInstance(metadata_buffer, config) local http = require('resty.http') local httpc = http.new() - local uri = config.backend_http_uri .. '/v3/management/reportProperties' + local uri = config.backend_http_uri .. '/v2/instance/register' local res, err = httpc:request_uri(uri, { method = "POST", @@ -92,10 +147,10 @@ function Client:reportServiceInstance(metadata_buffer, config) }) if not res then - log.err("Instance report fails, uri:", uri, ", err:", err) + log.err("Service Instance register fails, uri:", uri, ", err:", err) elseif res.status == 200 then - log.debug("Instance report, uri:", uri, ", response = ", res.body) - metadata_buffer:set('sw_instancePropertiesSubmitted', true) + log.debug("Service Instance report, uri:", uri, ", response = ", res.body) + metadata_buffer:set('serviceInstId', serviceId) else log.err("Instance report fails, uri:", uri, ", response code ", res.status) end @@ -103,12 +158,11 @@ end -- Ping the backend to update instance heartheat function Client:ping(metadata_buffer, config) - - local service_name = config.service_name - local service_instance_name = config.service_instance_name - local cjson = require('cjson') - local pingPkg = require("kong.plugins.skywalking.management").newServiceInstancePingPkg(service_name, service_instance_name) + local pingPkg = require("kong.plugins.skywalking.management").newServiceInstancePingPkg( + metadata_buffer:get('serviceInstId'), + metadata_buffer:get('serviceInstanceUUID'), + timestamp.get_utc()) local pingPkgParam, err = cjson.encode(pingPkg) if err then log.err("Agent ping fails, ", err) @@ -116,7 +170,7 @@ function Client:ping(metadata_buffer, config) local http = require('resty.http') local httpc = http.new() - local uri = config.backend_http_uri .. '/v3/management/keepAlive' + local uri = config.backend_http_uri .. '/v2/instance/heartbeat' local res, err = httpc:request_uri(uri, { method = "POST", @@ -141,7 +195,7 @@ local function sendSegments(segmentTransform, backend_http_uri) local http = require('resty.http') local httpc = http.new() - local uri = backend_http_uri .. '/v3/segments' + local uri = backend_http_uri .. '/v2/segments' local res, err = httpc:request_uri(uri, { method = "POST", diff --git a/kong/plugins/skywalking/correlation_context.lua b/kong/plugins/skywalking/correlation_context.lua deleted file mode 100644 index 7dd1935..0000000 --- a/kong/plugins/skywalking/correlation_context.lua +++ /dev/null @@ -1,115 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - --- limit define -local ELEMENT_MAX_NUMBER = 3 -local VALUE_MAX_LENGTH = 128 - -local k_utils = require "kong.tools.utils" -local encode_base64 = ngx.encode_base64 -local decode_base64 = ngx.decode_base64 - -local _M = {} - -function _M.new() - return {} -end - --- Deserialze value from the correlation context and initalize the context -function _M.fromSW8Value(value) - local context = _M.new() - - if value == nil or #value == 0 then - return context - end - - local data = k_utils.split(value, ',') - if #data == 0 then - return context - end - - for i, per_data in ipairs(data) - do - if #data > ELEMENT_MAX_NUMBER then - return context - end - - local parts = k_utils.split(per_data, ':') - if #parts == 2 then - local key = decode_base64(parts[1]) - local value = decode_base64(parts[2]) - - context[key] = value - end - end - - return context -end - --- Return string to represent this correlation context -function _M.serialize(context) - local encoded = '' - for name, value in pairs(context) do - if #encoded > 0 then - encoded = encoded .. ',' - end - - encoded = encoded .. encode_base64(name) .. ':' .. encode_base64(value) - end - - return encoded -end - --- Put the custom key/value into correlation context. -function _M.put(context, key, value) - -- key must not null - if not key then - return - end - - -- remove and return previous value when value is empty - if not value or #value == 0 then - context[key] = nil - return - end - - -- check value length - if #value > VALUE_MAX_LENGTH then - return - end - - -- already contain key, overwrite it - if context[key] then - context[key] = value - return - end - - - -- check keys count - local contextLength = 0 - for k,v in pairs(context) do - contextLength = contextLength + 1 - end - if contextLength >= ELEMENT_MAX_NUMBER then - return - end - - -- setting - context[key] = value -end - -return _M \ No newline at end of file diff --git a/kong/plugins/skywalking/management.lua b/kong/plugins/skywalking/management.lua index ccca53e..4c68e52 100644 --- a/kong/plugins/skywalking/management.lua +++ b/kong/plugins/skywalking/management.lua @@ -18,23 +18,52 @@ local _M = {} -- Return Services as service register parameter -function _M.newReportInstanceProperties(serviceName, serviceInstance) +function _M.newServiceRegister(unRegisterServiceName) + local serv = { + services = {} + } + + local service = { + serviceName = unRegisterServiceName, + -- Field type is optional, default value is `normal` + type = 'normal' + } + + serv.services[#serv.services + 1] = service + + return serv +end + +-- Return Services as service register parameter +function _M.newServiceInstanceRegister(registeredServiceId, serviceInstUUID, registerTime) + local serviceInstances = { + instances = {} + } + local allProperties = { key = "language", value = "lua" } - return { - service = serviceName, - serviceInstance = serviceInstance, - properties = {allProperties} + local serviceInstance = { + serviceId = registeredServiceId, + instanceUUID = serviceInstUUID, + time = registerTime, + properties = {} } + + serviceInstance.properties[#serviceInstance.properties + 1] = {allProperties} + + serviceInstances.instances[#serviceInstances.instances + 1] = serviceInstance + + return serviceInstances end -function _M.newServiceInstancePingPkg(serviceName, serviceInstance) +function _M.newServiceInstancePingPkg(serviceInstanceId, serviceInstanceUUID, updateTime) return { - service = serviceName, - serviceInstance = serviceInstance, + serviceInstanceId = serviceInstanceId, + time = updateTime, + serviceInstanceUUID = serviceInstanceUUID, } end diff --git a/kong/plugins/skywalking/segment_ref.lua b/kong/plugins/skywalking/segment_ref.lua index 7432dda..ea91981 100644 --- a/kong/plugins/skywalking/segment_ref.lua +++ b/kong/plugins/skywalking/segment_ref.lua @@ -34,25 +34,56 @@ local _M = {} function _M.new() return { type = 'CROSS_PROCESS', + network_address_id = 0, + entry_service_instance_id = 0, + parent_service_instance_id = 0, + entry_endpoint_id = 0, + parent_endpoint_id = 0, } end --- Deserialize value from the propagated context and initialize the SegmentRef -function _M.fromSW8Value(value) +-- Format a trace/segment id into an array. +-- An official ID should have three parts separated by '.' and each part of it is a number +function _M.formatID(str) + local parts = split(str, [[\.]]) + if #parts ~= 3 then + return nil + end + + return parts +end + +function _M.fromSW6Value(value) local ref = _M.new() local parts = k_utils.split(value, '-') - if #parts ~= 8 then + if #parts ~= 9 then return nil end - ref.trace_id = decode_base64(parts[2]) - ref.segment_id = decode_base64(parts[3]) + ref.trace_id = formatID(decode_base64(parts[2])) + ref.segment_id = formatID(decode_base64(parts[3])) ref.span_id = tonumber(parts[4]) - ref.parent_service = decode_base64(parts[5]) - ref.parent_service_instance = decode_base64(parts[6]) - ref.parent_endpoint = decode_base64(parts[7]) - ref.address_used_at_client = decode_base64(parts[8]) + ref.parent_service_instance_id = tonumber(parts[5]) + ref.entry_service_instance_id = tonumber(parts[6]) + local peerStr = decode_base64(parts[7]) + if string.sub(peerStr, 1, 1) == '#' then + ref.network_address = string.sub(peerStr, 2) + else + ref.network_address_id = tonumber(peerStr) + end + local entryEndpointStr = decode_base64(parts[8]) + if string.sub(entryEndpointStr, 1, 1) == '#' then + ref.entry_endpoint_name = string.sub(entryEndpointStr, 2) + else + ref.entry_endpoint_id = tonumber(entryEndpointStr) + end + local parentEndpointStr = decode_base64(parts[9]) + if string.sub(parentEndpointStr, 1, 1) == '#' then + ref.parent_endpoint_name = string.sub(parentEndpointStr, 2) + else + ref.parent_endpoint_id = tonumber(parentEndpointStr) + end return ref end diff --git a/kong/plugins/skywalking/span.lua b/kong/plugins/skywalking/span.lua index 5a32244..3b74b81 100644 --- a/kong/plugins/skywalking/span.lua +++ b/kong/plugins/skywalking/span.lua @@ -18,7 +18,7 @@ local timestamp = require "kong.tools.timestamp" local spanLayer = require("kong.plugins.skywalking.span_layer") local SegmentRef = require("kong.plugins.skywalking.segment_ref") -local CONTEXT_CARRIER_KEY = 'sw8' +local CONTEXT_CARRIER_KEY = 'sw6' local _M = {} -- local Span = { @@ -62,7 +62,7 @@ local _M = {} -- } -- Create an entry span. Represent the HTTP incoming request. --- @param contextCarrier, HTTP request header, which could carry the `sw8` context +-- @param contextCarrier, HTTP request header, which could carry the `sw6` context function _M.createEntrySpan(operationName, context, parent, contextCarrier) local span = _M.new(operationName, context, parent) span.is_entry = true @@ -70,7 +70,7 @@ function _M.createEntrySpan(operationName, context, parent, contextCarrier) if contextCarrier ~= nil then local propagatedContext = contextCarrier[CONTEXT_CARRIER_KEY] if propagatedContext ~= nil then - local ref = SegmentRef.fromSW8Value(propagatedContext) + local ref = SegmentRef.fromSW6Value(propagatedContext) if ref ~= nil then -- If current trace id is generated by the context, in LUA case, mostly are yes -- use the ref trace id to override it, in order to keep trace id consistently same. diff --git a/kong/plugins/skywalking/tracer.lua b/kong/plugins/skywalking/tracer.lua index 88d1c74..bb54e62 100644 --- a/kong/plugins/skywalking/tracer.lua +++ b/kong/plugins/skywalking/tracer.lua @@ -23,18 +23,16 @@ function Tracer:start(config, correlation) local TC = require('kong.plugins.skywalking.tracing_context') local Layer = require('kong.plugins.skywalking.span_layer') - local tracingContext - local service_name = config.service_name - local service_instance_name = config.service_instance_name - tracingContext = TC.new(service_name, service_instance_name) + local serviceInstId = metadata_buffer:get("serviceInstId") + local serviceId = metadata_buffer:get('serviceId') + local tracingContext = TC.new(serviceId, serviceInstId) -- Constant pre-defined in SkyWalking main repo -- 6000 represents Nginx local nginxComponentId = 6000 local contextCarrier = {} - contextCarrier["sw8"] = ngx.req.get_headers()["sw8"] - contextCarrier["sw8-correlation"] = ngx.req.get_headers()["sw8-correlation"] + contextCarrier["sw6"] = ngx.req.get_headers()["sw6"] local entrySpan = TC.createEntrySpan(tracingContext, ngx.var.uri, nil, contextCarrier) Span.start(entrySpan, ngx.now() * 1000) Span.setComponentId(entrySpan, nginxComponentId) @@ -50,7 +48,7 @@ function Tracer:start(config, correlation) local upstreamServerName = kong.request.get_host() ------------------------------------------------------ - local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier, correlation) + local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier) Span.start(exitSpan, ngx.now() * 1000) Span.setComponentId(exitSpan, nginxComponentId) Span.setLayer(exitSpan, Layer.HTTP) diff --git a/kong/plugins/skywalking/tracing_context.lua b/kong/plugins/skywalking/tracing_context.lua index f02a96f..4ebf0ed 100644 --- a/kong/plugins/skywalking/tracing_context.lua +++ b/kong/plugins/skywalking/tracing_context.lua @@ -19,8 +19,6 @@ local k_utils = require "kong.tools.utils" local Span = require('kong.plugins.skywalking.span') local CorrelationContext = require('kong.plugins.skywalking.correlation_context') -local CONTEXT_CORRELATION_KEY = 'sw8-correlation' - -------------- Internal Object------------- local Internal = {} -- Internal Object hosts the methods for SkyWalking LUA internal APIs only. @@ -107,16 +105,16 @@ function _M.newNoOP() return {is_noop = true} end -function _M.new(serviceName, serviceInstanceName) - if serviceInstanceName == nil or serviceName == nil then +function _M.new(serviceId, serviceInstID) + if serviceInstID == nil then return _M.newNoOP() end local tracing_context = {} tracing_context.trace_id = k_utils.uuid() tracing_context.segment_id = tracing_context.trace_id - tracing_context.service = serviceName - tracing_context.service_instance = serviceInstanceName + tracing_context.service_id = serviceId + tracing_context.service_inst_id = serviceInstID tracing_context.internal = Internal.new() tracing_context.internal.owner = tracing_context return tracing_context @@ -129,32 +127,16 @@ function _M.createEntrySpan(tracingContext, operationName, parent, contextCarrie return Span.newNoOP() end - local correlationData = '' - if contextCarrier then - correlationData = contextCarrier[CONTEXT_CORRELATION_KEY] - end - tracingContext.correlation = CorrelationContext.fromSW8Value(correlationData) - return Span.createEntrySpan(operationName, tracingContext, parent, contextCarrier) end -- Delegate to Span.createExitSpan -- @param contextCarrier could be nil if don't need to inject any context to propagate -function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier, correlation) +function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier,) if tracingContext.is_noop then return Span.newNoOP() end - if contextCarrier then - if correlation then - for name, value in pairs(correlation) do - CorrelationContext.put(tracingContext.correlation, name, value) - end - end - - contextCarrier[CONTEXT_CORRELATION_KEY] = CorrelationContext.serialize(tracingContext.correlation) - end - return Span.createExitSpan(operationName, tracingContext, parent, peer, contextCarrier) end From 15794eb865d66813b0777eb9fe7c69a9d88d1c1a Mon Sep 17 00:00:00 2001 From: "zhiqiang.shi" Date: Thu, 28 Jan 2021 17:17:01 +0800 Subject: [PATCH 2/2] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 194559c..fe6b28e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # kong-plugin-skywalking -The Nginx Lua agent 0.1.0 for Apache SkyWalking 8 kong-plugin +The Nginx Lua agent 0.1.0 for Apache SkyWalking 7 kong-plugin