n9e的http框架

n9e的http框架使用 #

Web流程 #

  1. 初始化http.Server
  2. 配置logging,recovery,session中间件
  3. 装载路由routers
  4. 启动http服务
  5. 关闭处理平滑关闭
web服务源码
// server object
var srv = &http.Server{
	ReadTimeout:    10 * time.Second,
	WriteTimeout:   10 * time.Second,
	MaxHeaderBytes: 1 << 20,
}

// start server
func Start() {
    // get global config
	c := config.Get()

    // init middlewares
	loggerMid := middleware.LoggerWithConfig(middleware.LoggerConfig{SkipPaths: skipPaths})
	recoveryMid := middleware.Recovery()
	store := cookie.NewStore([]byte(c.HTTP.Secret))
	sessionMid := sessions.Sessions("falcon-ng", store)

    // debug mode?
	if c.Logger.Level != "DEBUG" {
		gin.SetMode(gin.ReleaseMode)
		middleware.DisableConsoleColor()
	}

    // gin instance && use middlewares
	r := gin.New()
	r.Use(loggerMid, recoveryMid, sessionMid)

    // init routers
	routes.Config(r)

    // use address toolkit to get monapi server's listen address
    srv.Addr = address.GetHTTPListen("monapi")
    
    // assign gin handler to server
	srv.Handler = r

    // use go routine to start http server
	go func() {
		log.Println("starting http server, listening on:", srv.Addr)
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("listening %s occur error: %s\n", srv.Addr, err)
		}
	}()
}

// graceful shutdown http server with timeout 5s
func Shutdown() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatalln("cannot shutdown http server:", err)
	}
}

// close all
func ending() {
    // trap signals
    c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	select {
	case <-c:
		fmt.Printf("stop signal caught, stopping... pid=%d\n", os.Getpid())
	}

	logger.Close()
	http.Shutdown()
	fmt.Println("portal stopped successfully")
}

Web中间件 #

日志 #

该日志中间件原本是gin的默认日志中间件,n9e中单独拎出来是为了:

  1. 添加自己需要的字段,比如打印Body
  2. 使用全局的logger日志包打日志

gin的默认日志中间件,在输出媒介,只支持传入io.Writer,默认os.Stdout,而logger包没有实现io.Writer接口的对象,只能侵入中间件进行修改。

日志中间件源码
type LogFormatter func(params LogFormatterParams) string

type LoggerConfig struct {
	// Optional. Default value is gin.defaultLogFormatter
	Formatter LogFormatter

	// Output is a writer where logs are written.
	// Optional. Default value is gin.DefaultWriter.
	Output io.Writer

	// SkipPaths is a url path array which logs are not written.
	// Optional.
	SkipPaths []string
}

// 默认日志格式,通过ANSI颜色控制
var defaultLogFormatter = func(param LogFormatterParams) string {
	var statusColor, methodColor, resetColor string
	if param.IsOutputColor() {
		// 
		statusColor = param.StatusCodeColor()
		methodColor = param.MethodColor()
		resetColor = param.ResetColor()
	}

	if param.Latency > time.Minute {
		// Truncate in a golang < 1.8 safe way
		param.Latency = param.Latency - param.Latency%time.Second
	}

	return fmt.Sprintf("[GIN] |%s %3d %s| %13v | %15s |%s %-7s %s %s\n%s",
		statusColor, param.StatusCode, resetColor,
		param.Latency,
		param.ClientIP,
		methodColor, param.Method, resetColor,
		param.Path,
		param.ErrorMessage,
	)
}

func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
	// 配置日志格式化
	formatter := conf.Formatter
	if formatter == nil {
		formatter = defaultLogFormatter
	}

    // 配置日志输出,默认标准输出
	out := conf.Output
	if out == nil {
		out = os.Stdout
	}



	// 判断是否是term,决定是否显示颜色
	isTerm := true

	if w, ok := out.(*os.File); !ok || os.Getenv("TERM") == "dumb" ||
		(!isatty.IsTerminal(w.Fd()) && !isatty.IsCygwinTerminal(w.Fd())) {
		isTerm = false
	}

	// 跳过特定uri
	notlogged := conf.SkipPaths

	var skip map[string]struct{}

	if length := len(notlogged); length > 0 {
		skip = make(map[string]struct{}, length)

		for _, path := range notlogged {
			skip[path] = struct{}{}
		}
	}

	// 因为是实现中间件,所以必须返回handlerFunc
	return func(c *gin.Context) {
		// 定时器开启
		start := time.Now()
		// 获取url
		path := c.Request.URL.Path
		// 获取原始请求参数
		raw := c.Request.URL.RawQuery

		var (
			rdr1 io.ReadCloser
			rdr2 io.ReadCloser
		)

		// 获取body,
		if c.Request.Method != "GET" {
			// 读取body,得到buf
			buf, _ := ioutil.ReadAll(c.Request.Body)
			// 将Buffer实例转换为io.ReadCloser实例
			// rdr1没必要转换,下面用的时候,也只需要实现io.Reader就行了,甚至不需要转换,直接拿buf给下面用就好了
			rdr1 = ioutil.NopCloser(bytes.NewBuffer(buf))
			// rdr2需要转换为io.ReaderCloser,赋值给Body
			rdr2 = ioutil.NopCloser(bytes.NewBuffer(buf))

			c.Request.Body = rdr2
		}

		// Process request
		c.Next()

		// 将请求相关信息打印出来
		if _, ok := skip[path]; !ok {
			param := LogFormatterParams{
				Request: c.Request,
				isTerm:  isTerm,
				Keys:    c.Keys,
			}

			// 记录当钱时间戳
			param.TimeStamp = time.Now()
			// 计算延迟,并非响应延迟
			param.Latency = param.TimeStamp.Sub(start)

			param.ClientIP = c.ClientIP()
			param.Method = c.Request.Method
			param.StatusCode = c.Writer.Status()
			param.ErrorMessage = c.Errors.ByType(gin.ErrorTypePrivate).String()

			param.BodySize = c.Writer.Size()

			if raw != "" {
				path = path + "?" + raw
			}

			param.Path = path

			// fmt.Fprint(out, formatter(param))
			logger.Info(formatter(param))

			if c.Request.Method != "GET" {
				logger.Info(readBody(rdr1))
			}
		}
	}
}

func readBody(reader io.Reader) string {
	buf := new(bytes.Buffer)
	buf.ReadFrom(reader)

	s := buf.String()
	return s
}

// 如果是线上运行,那么就清除颜色
func DisableConsoleColor() {
	consoleColorMode = disableColor
}

这一块就是gin的日志中间件:

  1. 根据http信息格式化日志
  2. 打印http调试信息

知识点:

  1. 中间件写法,传入context,返回handlerFunc,通过c.Next继续处理请求,然后中间件继续执行
  2. 日志处理写法
  3. io操作,针对于流处理,比如Body,既要日志处理Body信息,又要不影响Body后续处理,需要对Body流进行拷贝,io.ReadAll读取为字节,再将字节转换为buffer,再将buffer转换为ioutil.NopCloser,使其能够被重新赋给Body
  4. golang对于颜色显示的处理,这里使用字节数组,当然直接使用转义字符/字符串会更加直观

异常恢复 #

recovery中间件,表面上是处理异常恢复,实际上是针对http异常状态的一个统一处理。

recovery中间件源码
// 入口,传入os.Stderr
func Recovery() gin.HandlerFunc {
	return RecoveryWithWriter(gin.DefaultErrorWriter)
}

// 
func RecoveryWithWriter(out io.Writer) gin.HandlerFunc {
	var logger *log.Logger

	if out != nil {
		logger = log.New(out, "\n\n\x1b[31m", log.LstdFlags)
	}

	return func(c *gin.Context) {
		// 注册一个defer func,所有请求最终都会进入
		defer func() {
			if err := recover(); err != nil {
				// custom error,页面错误均返回状态码200,然后打印错误信息,没必要使用status code来暴露错误
				if e, ok := err.(errors.PageError); ok {
					c.JSON(200, gin.H{"err": e.Message})
					c.Abort()
					return
				}

				// Check for a broken connection, as it is not really a
				// condition that warrants a panic stack trace.
				// 检测连接错误
				var brokenPipe bool
				if ne, ok := err.(*net.OpError); ok {
					if se, ok := ne.Err.(*os.SyscallError); ok {
						if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
							brokenPipe = true
						}
					}
				}
				if logger != nil {
					stack := stack(3)
					httpRequest, _ := httputil.DumpRequest(c.Request, false)
					headers := strings.Split(string(httpRequest), "\r\n")
					for idx, header := range headers {
						current := strings.Split(header, ":")
						if current[0] == "Authorization" {
							headers[idx] = current[0] + ": *"
						}
					}
					if brokenPipe {
						logger.Printf("%s\n%s%s", err, string(httpRequest), reset)
					} else if gin.IsDebugging() {
						logger.Printf("[Recovery] %s panic recovered:\n%s\n%s\n%s%s",
							timeFormat(time.Now()), strings.Join(headers, "\r\n"), err, stack, reset)
					} else {
						logger.Printf("[Recovery] %s panic recovered:\n%s\n%s%s",
							timeFormat(time.Now()), err, stack, reset)
					}
				}

				// If the connection is dead, we can't write a status to it.
				if brokenPipe {
					c.Error(err.(error)) // nolint: errcheck
					c.Abort()
				} else {
					c.AbortWithStatus(http.StatusInternalServerError)
				}
			}
		}()
		c.Next()
	}
}

func stack(skip int) []byte {
	buf := new(bytes.Buffer) // the returned data
	// As we loop, we open files and read them. These variables record the currently
	// loaded file.
	var lines [][]byte
	var lastFile string
	for i := skip; ; i++ { // Skip the expected number of frames
		pc, file, line, ok := runtime.Caller(i)
		if !ok {
			break
		}
		// Print this much at least.  If we can't find the source, it won't show.
		fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc)
		if file != lastFile {
			data, err := ioutil.ReadFile(file)
			if err != nil {
				continue
			}
			lines = bytes.Split(data, []byte{'\n'})
			lastFile = file
		}
		fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line))
	}
	return buf.Bytes()
}

在n9e中,controller中,使用了自定义错误PageError

type PageError struct {
	Message string
}

func (p PageError) Error() string {
	return p.Message
}

func (p PageError) String() string {
	return p.Message
}

func Bomb(format string, a ...interface{}) {
	panic(PageError{Message: fmt.Sprintf(format, a...)})
}

func Dangerous(v interface{}) {
	if v == nil {
		return
	}

	switch t := v.(type) {
	case string:
		if t != "" {
			panic(PageError{Message: t})
		}
	case error:
		panic(PageError{Message: t.Error()})
	}
}

然后针对于页面异常PageErrro,统一封装返回json

如果是特定其他异常,比如BrokenPipe之类的,会做特殊处理,比如打印请求调试,打印调用栈

会话 #

session会话,为了满足http无状态而产生的一个概念,主要用于存储用户区别信息,主要有两种方式:

  1. 使用redis/mysql这类的共享存储,存于服务端
  2. 使用Cookie存于浏览器端,但是必须加密,秘钥固定不变

这类就使用了cookie加密存储session的方式

session中间件源码
// 初始化
store := cookie.NewStore([]byte(c.HTTP.Secret))
sessionMid := sessions.Sessions("falcon-ng", store)

gin.Use(sessionMid)

// 写入信息
session := sessions.Default(c)
session.Set("username", user)
session.Save()

// 获取信息
value := session.Get("username")

验证 #

对于需要登录访问的接口,提供了middleware.Logined()中间件;对于内部访问的接口,提供了middleware.CheckHeaderToken()中间件,只需要在响应的接口组Use即可

内建token验证中间件源码
const internalToken = "monapi-builtin-token"

// CheckHeaderToken check thirdparty x-srv-token
func CheckHeaderToken() gin.HandlerFunc {
	return func(c *gin.Context) {
		token := c.GetHeader("x-srv-token")
		if token != internalToken {
			errors.Bomb("token[%s] invalid", token)
		}
		c.Next()
	}
}
v1 := r.Group("/v1/portal").Use(middleware.CheckHeaderToken())
{
	v1.POST("/endpoint", endpointImport)
}

仅有一处使用到该内建token,在index组件中,将endpoint存到endpoint表中。

这里为啥不使用model中的EndpointImport方法,而非要调用接口,费解,如果使用EndpointImport,这样endpoint表在哪里读写,一目了然

路由处理 #

代理 #

据透露,代理仅用于调试

func indexReq(c *gin.Context) {
	target, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", address.GetHTTPPort("index")))
	errors.Dangerous(err)

	proxy := httputil.NewSingleHostReverseProxy(target)
	c.Request.Header.Set("X-Forwarded-Host", c.Request.Header.Get("Host"))

	proxy.ServeHTTP(c.Writer, c.Request)
}

接口设计 #

尽可能满足RESTful接口设计

n9e接口源码
func Config(r *gin.Engine) {
	// 静态页面
	r.Static("/pub", "./pub")
	// 图标
	r.StaticFile("/favicon.ico", "./pub/favicon.ico")

	// 心跳检测
	hbs := r.Group("/api/hbs")
	{
		hbs.POST("/heartbeat", heartBeat)
		// 获取具体模块的所有实例,只包括judge和index
		hbs.GET("/instances", instanceGets)
	}

	// portal接口
	nolog := r.Group("/api/portal")
	{
		// 健康接口
		nolog.GET("/ping", ping)
		// 版本
		nolog.GET("/version", version)
		// pid
		nolog.GET("/pid", pid)
		// remote addr
		nolog.GET("/addr", addr)

		// 登录接口
		nolog.POST("/auth/login", login)
		// 退出登录
		nolog.GET("/auth/logout", logout)
		
		// 获取邀请token
		nolog.GET("/users/invite", userInviteGet)
		// 邀请用户注册
		nolog.POST("/users/invite", userInvitePost)

		// 获取endpoint的所有采集配置项
		nolog.GET("/collects/:endpoint", collectGetByEndpoint)

		// 获取策略
		nolog.GET("/stras/effective", effectiveStrasGet)
		nolog.GET("/stras", strasAll)
	}

	login := r.Group("/api/portal").Use(middleware.Logined())
	{
		// profile获取
		login.GET("/self/profile", selfProfileGet)
		login.PUT("/self/profile", selfProfilePut)
		login.PUT("/self/password", selfPasswordPut)

		// 用户列表
		login.GET("/user", userListGet)
		// 添加用户
		login.POST("/user", userAddPost)
		// 获取用户profile
		login.GET("/user/:id/profile", userProfileGet)
		// 修改profile
		login.PUT("/user/:id/profile", userProfilePut)
		// 修改密码
		login.PUT("/user/:id/password", userPasswordPut)
		// 删除用户
		login.DELETE("/user/:id", userDel)

		// 获取team
		login.GET("/team", teamListGet)
		// 添加team
		login.POST("/team", teamAddPost)
		// 更新team
		login.PUT("/team/:id", teamPut)
		// 删除team
		login.DELETE("/team/:id", teamDel)
		// 获取endpoints
		login.GET("/endpoint", endpointGets)
		// 导入endpoints
		login.POST("/endpoint", endpointImport)
		// 修改endpoint,主要是改alias
		login.PUT("/endpoint/:id", endpointPut)
		// 删除endpoint
		login.DELETE("/endpoint", endpointDel)

		// 根据idents获取对应绑定的节点
		login.GET("/endpoints/bindings", endpointBindingsGet)
		// 根据节点id获取endpoints
		login.GET("/endpoints/bynodeids", endpointByNodeIdsGets)

		// 获取tree信息
		login.GET("/tree", treeGet)
		// 搜索tree获取节点
		login.GET("/tree/search", treeSearchGet)
		// 添加节点
		login.POST("/node", nodePost)
		// 修改节点名字
		login.PUT("/node/:id/name", nodeNamePut)
		// 删除节点
		login.DELETE("/node/:id", nodeDel)
		// 获取node节点下的所有endpoints
		login.GET("/node/:id/endpoint", endpointsUnder)
		// 绑定节点和endpoint
		login.POST("/node/:id/endpoint-bind", endpointBind)
		// 解绑节点和endpoint
		login.POST("/node/:id/endpoint-unbind", endpointUnbind)
		// 获取屏蔽配置
		login.GET("/node/:id/maskconf", maskconfGets)
		// 获取screen
		login.GET("/node/:id/screen", screenGets)
		// 添加screen
		login.POST("/node/:id/screen", screenPost)

		// 节点搜索
		login.GET("/nodes/search", nodeSearchGet)
		// 获取节点叶子节点
		login.GET("/nodes/leafids", nodeLeafIdsGet)
		// 获取节点的所有父节点
		login.GET("/nodes/pids", nodePidsGet)
		// 获取所有节点
		login.GET("/nodes/byids", nodesByIdsGets)

		// 添加屏蔽
		login.POST("/maskconf", maskconfPost)
		// 修改屏蔽
		login.PUT("/maskconf/:id", maskconfPut)
		// 删除屏蔽
		login.DELETE("/maskconf/:id", maskconfDel)

		// 修改screen
		login.PUT("/screen/:id", screenPut)
		// 删除screen
		login.DELETE("/screen/:id", screenDel)
		// 获取某screen下的所有subclass
		login.GET("/screen/:id/subclass", screenSubclassGets)
		// 添加某screen下的subclass
		login.POST("/screen/:id/subclass", screenSubclassPost)

		// 修改subclass
		login.PUT("/subclass", screenSubclassPut)
		// 删除subclass
		login.DELETE("/subclass/:id", screenSubclassDel)
		// 获取某subclass下的所有chart
		login.GET("/subclass/:id/chart", chartGets)
		// 在某subclass下添加chart
		login.POST("/subclass/:id/chart", chartPost)
		// 修改subclass的screen
		login.PUT("/subclasses/loc", screenSubclassLocPut)
		// 修改chart
		login.PUT("/chart/:id", chartPut)
		// 删除chart
		login.DELETE("/chart/:id", chartDel)
		// 修改charts的权重,该权重用于排序
		login.PUT("/charts/weights", chartWeightsPut)
		
		// 图表调试用
		login.GET("/tmpchart", tmpChartGet)
		login.POST("/tmpchart", tmpChartPost)

		// 报警==事件
		// 事件获取
		login.GET("/event/cur", eventCurGets)
		// 根据id获取事件
		login.GET("/event/cur/:id", eventCurGetById)
		// 删除事件
		login.DELETE("/event/cur/:id", eventCurDel)
		// 获取事件历史
		login.GET("/event/his", eventHisGets)
		// 根据id获取事件历史
		login.GET("/event/his/:id", eventHisGetById)
		// 添加认领人
		login.POST("/event/curs/claim", eventCurClaim)
		// 添加采集信息
		login.POST("/collect", collectPost)
		// 获取采集信息列表
		login.GET("/collect/list", collectsGet)
		// 获取采集信息列表
		login.GET("/collect", collectGet)
		// 修改采集项
		login.PUT("/collect", collectPut)
		// 删除采集项
		login.DELETE("/collect", collectsDel)
		// 采集项正则检测
		login.POST("/collect/check", regExpCheck)

		// 添加策略
		login.POST("/stra", straPost)
		// 修改策略
		login.PUT("/stra", straPut)
		// 删除策略
		login.DELETE("/stra", strasDel)
		// 获取策略
		login.GET("/stra", strasGet)
		// 获取某个策略
		login.GET("/stra/:sid", straGet)
	}

	
	v1 := r.Group("/v1/portal").Use(middleware.CheckHeaderToken())
	{	
		// 导入endpoint
		v1.POST("/endpoint", endpointImport)
	}

	// 本地调试用
	transferProxy := r.Group("/api/transfer")
	{
		transferProxy.GET("/req", transferReq)
		transferProxy.POST("/data", transferReq)
		transferProxy.POST("/data/ui", transferReq)
		transferProxy.POST("/push", transferReq)
	}

	// 本地调试用
	indexProxy := r.Group("/api/index")
	{
		indexProxy.POST("/metrics", indexReq)
		indexProxy.POST("/tagkv", indexReq)
		indexProxy.POST("/counter/fullmatch", indexReq)
		indexProxy.POST("/counter/clude", indexReq)
		indexProxy.POST("/counter/detail", indexReq)
	}
}

各类资源通用模式/模板 #

  • 添加资源 POST /api/portal/res
  • 获取列表 GET /api/portal/res
  • 获取某个资源 GET /api/portal/res/:id
  • 更新某个资源 PUT /api/portal/res/:id
  • 更新某个资源的某一项 PUT /api/portal/res/:id/item
  • 获取子资源 GET /api/portal/res/:id/subres/:subid
  • 添加子资源 POST /api/portal/res/:id/subres/:subid
  • 修改子资源 PUT /api/portal/subres/:subid
  • 删除子资源 DELETE /api/portal/subres/:subid

附录 #

信号相关 #

SIGINT,SIGTERM,SIGQUITSIGHUP默认行为都是杀掉进程,但是用户可以拦截他们,并这些信号添加自己的处理逻辑,所以不同的应用需要小心嘞,它可能不是系统默认行为。

必杀技!从不失败(但是针对一些进程状态异常的,可能有问题),这个信号很危险,不会给进程反应的时间,会导致一些资源没法清理,比如缓存目录,正在处理的连接等
相当弱的一个信号,传统的意义是:停止正在做的,等待用户后续输入。在终端中通常由Ctrl+C产生;在非交互式的程序中,跟SIGTERM类似,就是杀死进程。
这是最正常的杀进程信号,它通知应用进程干净地死去,应用进程可能会处理一些后事,比如保存状态,释放资源等;但是特别重要的进程也会选择忽略SIGTERM信号。
SIGHUP is about the same as SIGTERM in terms of harshness, but it has a specific role because it’s automatically sent to applications running in a terminal when the user disconnects from that terminal (etymologically, because the user was connecting via a telephone line and the modem hung up). SIGHUP is often involuntary, unlike SIGTERM which has to be sent explicitly, so applications should try to save their state on a SIGHUP. SIGHUP also has a completely different conventional meaning for non-user-facing applications (daemons), which is to reload their configuration file.
SIGQUIT is the harshest of the ignorable signals. It’s meant to be used when an application is misbehaving and needs to be killed now, and by default it traditionally left a core dump file (modern systems where most users wouldn’t know what a core file is tend to not produce them by default). Applications can set a handler but should do very little (in particular not save any state) because the intent of SIGQUIT is to be used when something is seriously wrong.

颜色相关 #

参考: ANSI转义序列

在bash中使用颜色:

echo -e "\033[32m绿色字\033[0m"

\033是八进制表示2,或者十六进制\x1b表示,或者字节数组29表示,在ASCII中代表ESC,是不可打印的字符

ANSI控制序列三部分构成: 前置引导CSI控制序列结束符号

CSI控制序列主要是控制输出样式的,比如颜色, 光标等… ,格式为:[<PREFIX>];[<COLOR>];[<TEXT DECORATION>],其中:

  • PREFIX: 使用的 256的颜色模式,后面将介绍。
  • COLOR: 输出颜色,前景色等
  • TEXT DECORATION: 文字装饰器,比如下划线等(1粗2深3下划线4斜体42背景绿)

n9e中用字节表示:

/* 27 -> \33
91 -> [
57 -> 9
55 -> 7
59 -> ;
52 -> 4
50 -> 2
109 -> m
*/
// 代表绿底白字
[]byte{27, 91, 57, 55, 59, 52, 50, 109}