From e398f7296f8c97d8ee18225d5493501799d499f7 Mon Sep 17 00:00:00 2001 From: yxh Date: Fri, 11 Aug 2023 18:00:35 +0800 Subject: [PATCH] =?UTF-8?q?add=20=E6=B7=BB=E5=8A=A0=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=9F=E5=88=97=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 +- go.mod | 27 +- go.sum | 66 +- internal/app/boot/boot.go | 1 + internal/app/common/logic/captcha/captcha.go | 17 +- internal/app/mqueue/.gitignore | 1 + internal/app/mqueue/config/config.yaml | 21 + internal/app/mqueue/consts/message.go | 86 ++ internal/app/mqueue/controller/demo.go | 76 ++ internal/app/mqueue/driver/diskqueue.go | 757 ++++++++++++++++ internal/app/mqueue/driver/diskqueue_test.go | 826 ++++++++++++++++++ internal/app/mqueue/logic/diskqueue/client.go | 248 ++++++ .../app/mqueue/logic/diskqueue/consumer.go | 49 ++ .../app/mqueue/logic/diskqueue/producer.go | 55 ++ internal/app/mqueue/logic/logic.go | 11 + internal/app/mqueue/logic/mqueue/mqueue.go | 197 +++++ internal/app/mqueue/logic/nsq/consumer.go | 90 ++ internal/app/mqueue/logic/nsq/producer.go | 67 ++ .../app/mqueue/logic/rocketmq/consumer.go | 71 ++ .../app/mqueue/logic/rocketmq/producer.go | 87 ++ internal/app/mqueue/model/diskqueue.go | 19 + internal/app/mqueue/model/mqueue.go | 37 + internal/app/mqueue/mqueue_test.go | 168 ++++ internal/app/mqueue/router/demo.go | 25 + internal/app/mqueue/service/mqueue.go | 50 ++ manifest/config/config.yaml.bak | 24 +- resource/template/vm/vue/edit-vue.template | 5 + 27 files changed, 3063 insertions(+), 22 deletions(-) create mode 100644 internal/app/mqueue/.gitignore create mode 100644 internal/app/mqueue/config/config.yaml create mode 100644 internal/app/mqueue/consts/message.go create mode 100644 internal/app/mqueue/controller/demo.go create mode 100644 internal/app/mqueue/driver/diskqueue.go create mode 100644 internal/app/mqueue/driver/diskqueue_test.go create mode 100644 internal/app/mqueue/logic/diskqueue/client.go create mode 100644 internal/app/mqueue/logic/diskqueue/consumer.go create mode 100644 internal/app/mqueue/logic/diskqueue/producer.go create mode 100644 internal/app/mqueue/logic/logic.go create mode 100644 internal/app/mqueue/logic/mqueue/mqueue.go create mode 100644 internal/app/mqueue/logic/nsq/consumer.go create mode 100644 internal/app/mqueue/logic/nsq/producer.go create mode 100644 internal/app/mqueue/logic/rocketmq/consumer.go create mode 100644 internal/app/mqueue/logic/rocketmq/producer.go create mode 100644 internal/app/mqueue/model/diskqueue.go create mode 100644 internal/app/mqueue/model/mqueue.go create mode 100644 internal/app/mqueue/mqueue_test.go create mode 100644 internal/app/mqueue/router/demo.go create mode 100644 internal/app/mqueue/service/mqueue.go diff --git a/.gitignore b/.gitignore index deab41c..e35126b 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,6 @@ pkg/ bin/ cbuild **/.DS_Store -.vscode/ .test/ main output/ @@ -21,9 +20,10 @@ manifest/output/ *.exe tmp/ resource/data/gen_sql +resource/data/storage resource/log/ resource/public/big_file resource/public/upload_file manifest/config/config.yaml GFastV3 -/manifest/config/db.yaml +/manifest/config/db.yaml \ No newline at end of file diff --git a/go.mod b/go.mod index 111e3f7..735e1d8 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,15 @@ module github.com/tiger1103/gfast/v3 go 1.18 require ( + github.com/apache/rocketmq-client-go/v2 v2.1.1 github.com/casbin/casbin/v2 v2.42.0 github.com/gogf/gf/contrib/drivers/mysql/v2 v2.2.5 github.com/gogf/gf/contrib/nosql/redis/v2 v2.3.0 github.com/gogf/gf/v2 v2.4.1 + github.com/gorilla/websocket v1.5.0 github.com/mojocn/base64Captcha v1.3.5 github.com/mssola/user_agent v0.5.3 + github.com/nsqio/go-nsq v1.1.0 github.com/qiniu/go-sdk/v7 v7.13.0 github.com/shirou/gopsutil/v3 v3.23.2 github.com/tencentyun/cos-go-sdk-v5 v0.7.34 @@ -23,6 +26,7 @@ require ( github.com/clbanning/mxj v1.8.4 // indirect github.com/clbanning/mxj/v2 v2.5.7 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/emirpasic/gods v1.12.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect @@ -32,29 +36,48 @@ require ( github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect + github.com/golang/mock v1.4.4 // indirect + github.com/golang/snappy v0.0.1 // indirect github.com/google/go-querystring v1.0.0 // indirect - github.com/gorilla/websocket v1.5.0 // indirect github.com/grokify/html-strip-tags-go v0.0.1 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mozillazg/go-httpheader v0.2.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/pkg/errors v0.8.1 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/rivo/uniseg v0.4.3 // indirect + github.com/satori/go.uuid v1.2.0 // indirect + github.com/sirupsen/logrus v1.4.0 // indirect + github.com/stathat/consistent v1.0.0 // indirect + github.com/tidwall/gjson v1.13.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opentelemetry.io/otel v1.11.2 // indirect go.opentelemetry.io/otel/sdk v1.11.2 // indirect go.opentelemetry.io/otel/trace v1.11.2 // indirect + go.uber.org/atomic v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/image v0.0.0-20190501045829-6d32002ffd75 // indirect + golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect golang.org/x/net v0.7.0 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.5.0 // indirect + golang.org/x/term v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect + golang.org/x/tools v0.1.12 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index bb220e7..4990ff0 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= +github.com/apache/rocketmq-client-go/v2 v2.1.1 h1:WY/LkOYSQaVyV+HOqdiIgF4LE3beZ/jwdSLKZlzpabw= +github.com/apache/rocketmq-client-go/v2 v2.1.1/go.mod h1:GZzExtXY9zpI6FfiVJYAhw2IXQtgnHUuWpULo7nr5lw= github.com/casbin/casbin/v2 v2.42.0 h1:EA0aE5PZnFSYY6WulzTScOo4YO6xrGAAZkXRLs8p2ME= github.com/casbin/casbin/v2 v2.42.0/go.mod h1:sEL80qBYTbd+BPeL4iyvwYzFT3qwLaESq5aFKVLbLfA= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -24,6 +26,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -63,6 +67,7 @@ github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQA github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -74,6 +79,8 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -84,9 +91,12 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -94,6 +104,12 @@ github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -120,12 +136,18 @@ github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWV github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mojocn/base64Captcha v1.3.5 h1:Qeilr7Ta6eDtG4S+tQuZ5+hO+QHbiGAJdi4PfoagaA0= github.com/mojocn/base64Captcha v1.3.5/go.mod h1:/tTTXn4WTpX9CfrmipqRytCpJ27Uw3G6I7NcP2WwcmY= github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSrwhe88TQQ= github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60= github.com/mssola/user_agent v0.5.3 h1:lBRPML9mdFuIZgI2cmlQ+atbpJdLdeVl2IDodjBR578= github.com/mssola/user_agent v0.5.3/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -143,7 +165,11 @@ github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= @@ -158,11 +184,23 @@ github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shirou/gopsutil/v3 v3.23.2 h1:PAWSuiAszn7IhPMBtXsbSCafej7PqUOvY6YywlQUExU= github.com/shirou/gopsutil/v3 v3.23.2/go.mod h1:gv0aQw33GLo3pG8SiWKiQrbDzbRY1K80RyZJ7V4Th1M= +github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -175,6 +213,12 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.194/go.mod github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.194/go.mod h1:yrBKWhChnDqNz1xuXdSbWXG56XawEq0G5j1lg4VwBD4= github.com/tencentyun/cos-go-sdk-v5 v0.7.34 h1:xm+Pg+6m486y4eugRI7/E4WasbVmpY1hp9QBSRErgp8= github.com/tencentyun/cos-go-sdk-v5 v0.7.34/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= +github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= +github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tiger1103/gfast-cache v1.0.0 h1:+amboC6uu4AvkUnDz4DECcsBTp5HW+O98k8guJtrDlA= github.com/tiger1103/gfast-cache v1.0.0/go.mod h1:l+e5vdUHmqK0Th5VBOCSxXORbm8MwZQMXDkn+KA+amE= github.com/tiger1103/gfast-token v1.0.3 h1:6uPGGuhxlLODV9tDS1djhWHUSaIYtVNyOqibHTITCt4= @@ -199,13 +243,19 @@ go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH81 go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= +go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM= +go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/image v0.0.0-20190501045829-6d32002ffd75 h1:TbGuee8sSq15Iguxu4deQ7+Bqq/d2rsQejGcEtADAMQ= golang.org/x/image v0.0.0-20190501045829-6d32002ffd75/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -224,8 +274,10 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -249,12 +301,15 @@ golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211020174200-9d6173849985/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -264,10 +319,15 @@ golang.org/x/text v0.3.8-0.20211105212822-18b340fc7af2/go.mod h1:EFNZuWvGYxIRUEX golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -286,6 +346,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -297,3 +359,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/internal/app/boot/boot.go b/internal/app/boot/boot.go index 799ce27..fabed2a 100644 --- a/internal/app/boot/boot.go +++ b/internal/app/boot/boot.go @@ -9,5 +9,6 @@ package boot import ( _ "github.com/tiger1103/gfast/v3/internal/app/common/logic" + _ "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic" _ "github.com/tiger1103/gfast/v3/internal/app/system/logic" ) diff --git a/internal/app/common/logic/captcha/captcha.go b/internal/app/common/logic/captcha/captcha.go index bfc690a..1bc01c2 100644 --- a/internal/app/common/logic/captcha/captcha.go +++ b/internal/app/common/logic/captcha/captcha.go @@ -18,7 +18,7 @@ func init() { service.RegisterCaptcha(New()) } -func New() *sCaptcha { +func New() service.ICaptcha { return &sCaptcha{ driver: &base64Captcha.DriverString{ Height: 80, @@ -38,21 +38,6 @@ type sCaptcha struct { store base64Captcha.Store } -var ( - captcha = sCaptcha{ - driver: &base64Captcha.DriverString{ - Height: 80, - Width: 240, - NoiseCount: 50, - ShowLineOptions: 20, - Length: 4, - Source: "abcdefghjkmnpqrstuvwxyz23456789", - Fonts: []string{"chromohv.ttf"}, - }, - store: base64Captcha.DefaultMemStore, - } -) - // GetVerifyImgString 获取字母数字混合验证码 func (s *sCaptcha) GetVerifyImgString(ctx context.Context) (idKeyC string, base64stringC string, err error) { driver := s.driver.ConvertFonts() diff --git a/internal/app/mqueue/.gitignore b/internal/app/mqueue/.gitignore new file mode 100644 index 0000000..dbca33e --- /dev/null +++ b/internal/app/mqueue/.gitignore @@ -0,0 +1 @@ +/storage diff --git a/internal/app/mqueue/config/config.yaml b/internal/app/mqueue/config/config.yaml new file mode 100644 index 0000000..2ad8970 --- /dev/null +++ b/internal/app/mqueue/config/config.yaml @@ -0,0 +1,21 @@ +# 正式项目请修改manifest/config.yaml下的配置,这个只是示例和Test使用 +mqueue: + enable: true + closeWaitTime: 5 # 关闭服务器时等待刷盘时间(秒),最好不要小于5 + driver: "diskQueue" # 驱动类型 nsq|diskQueue|rocketmq + channel: "channel" # 默认频道名称 + nsq: + address: "192.168.75.3" # nsq地址 + producer_port: 4150 # 生产者端口 + consumer_port: 4161 # 消费者http端口 + rocketmq: + nameServers: "192.168.75.3:9876" # 服务器地址 + retry: 2 # 重试次数 + logLevel: "warn" # 日志级别 debug|error|warn|fatal + diskQueue: + dataPath: "./resource/data/storage/diskqueue" # 存储路径 + maxBytesPerFile: 2097152 # 单个单件最大字节数(byte),默认:2m(2097152) + syncEvery: 2500 # 多少次读写后刷盘 + syncTimeout: 2 # 刷盘间隔(秒) + maxMsgSize: 128144 # 最大消息字节数(byte) + minMsgSize: 0 # 最小消息字节数 diff --git a/internal/app/mqueue/consts/message.go b/internal/app/mqueue/consts/message.go new file mode 100644 index 0000000..1417265 --- /dev/null +++ b/internal/app/mqueue/consts/message.go @@ -0,0 +1,86 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/31 11:12 + */ + +package consts + +import "time" + +type SendMsgMethod int + +// MsgDelayLevel 参数1-18 对应时间: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h +type MsgDelayLevel int + +const ( + MsgIdLength = 32 + SendMsgPublish = 1 + SendMsgPublishAsync = 2 + SendMsgDelay = 3 +) + +const ( + MsgDelay1s MsgDelayLevel = iota + 1 + MsgDelay5s + MsgDelay10s + MsgDelay30s + MsgDelay1m + MsgDelay2m + MsgDelay3m + MsgDelay4m + MsgDelay5m + MsgDelay6m + MsgDelay7m + MsgDelay8m + MsgDelay9m + MsgDelay10m + MsgDelay20m + MsgDelay30m + MsgDelay1h + MsgDelay2h +) + +func (m MsgDelayLevel) Value() time.Duration { + delay := 1 + switch m { + case MsgDelay1s: + delay = 1 + case MsgDelay5s: + delay = 5 + case MsgDelay10s: + delay = 10 + case MsgDelay30s: + delay = 30 + case MsgDelay1m: + delay = 60 + case MsgDelay2m: + delay = 60 * 2 + case MsgDelay3m: + delay = 60 * 3 + case MsgDelay4m: + delay = 60 * 4 + case MsgDelay5m: + delay = 60 * 5 + case MsgDelay6m: + delay = 60 * 6 + case MsgDelay7m: + delay = 60 * 7 + case MsgDelay8m: + delay = 60 * 8 + case MsgDelay9m: + delay = 60 * 9 + case MsgDelay10m: + delay = 60 * 10 + case MsgDelay20m: + delay = 60 * 20 + case MsgDelay30m: + delay = 60 * 30 + case MsgDelay1h: + delay = 60 * 60 * 1 + case MsgDelay2h: + delay = 60 * 60 * 2 + } + return time.Duration(delay) * time.Second +} diff --git a/internal/app/mqueue/controller/demo.go b/internal/app/mqueue/controller/demo.go new file mode 100644 index 0000000..7c04804 --- /dev/null +++ b/internal/app/mqueue/controller/demo.go @@ -0,0 +1,76 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/28 14:21 + */ + +package controller + +import ( + "context" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gorilla/websocket" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" + "github.com/tiger1103/gfast/v3/library/libResponse" + "sync" +) + +type demo struct { +} + +var Demo = new(demo) + +func (c *demo) Produce(r *ghttp.Request) { + msg := &model.MQSendMsg{ + Topic: r.GetForm("topic").String(), + Body: []byte(r.GetForm("body").String()), + } + err := service.MQueue().SendMsg(msg) + if err != nil { + libResponse.FailJson(true, r, "error", err.Error()) + } + libResponse.SusJson(true, r, "success") +} + +func (c *demo) Subscribe(r *ghttp.Request) { + wg := sync.WaitGroup{} + wg.Add(1) + var err error + go func() { + topic := r.Get("topic").String() + channel := r.Get("channel").String() + ws, err := r.WebSocket() + if err != nil { + wg.Done() + return + } + err = service.MQueue().Subscribe(topic, channel, func(m *model.MQMessage) error { + fmt.Println(m) + fmt.Println(string(m.Body)) + ws.WriteMessage(websocket.TextMessage, m.Body) + return nil + }) + if err != nil { + wg.Done() + return + } + wg.Done() + for { + _, _, err = ws.ReadMessage() + if err != nil { + g.Log().Warning(context.Background(), fmt.Sprintf("取消订阅 主题:%s,频道:%s", topic, channel)) + service.MQueue().Unsubscribe(topic, channel) + return + } + } + }() + wg.Wait() + if err != nil { + libResponse.FailJson(true, r, "error", err.Error()) + } + libResponse.SusJson(true, r, "success") +} diff --git a/internal/app/mqueue/driver/diskqueue.go b/internal/app/mqueue/driver/diskqueue.go new file mode 100644 index 0000000..6019ab0 --- /dev/null +++ b/internal/app/mqueue/driver/diskqueue.go @@ -0,0 +1,757 @@ +/** + * 基于磁盘的 FIFO 先进先出消息队列 + * source nsq + * https://github.com/nsqio/go-diskqueue/blob/master/diskqueue.go + */ + +package disk + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "math/rand" + "os" + "path" + "sync" + "time" +) + +type LogLevel int + +const ( + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) +) + +type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) + +func (l LogLevel) String() string { + switch l { + case 1: + return "DEBUG" + case 2: + return "INFO" + case 3: + return "WARNING" + case 4: + return "ERROR" + case 5: + return "FATAL" + } + panic("invalid LogLevel") +} + +type IDiskQueue interface { + // Put 加锁检测 标志位是否退出, 如果否 则继续往文件写入数据并等待结果 + // 写入*.meta.dat 格式 + // 1.depth + // 2.readFileNum,readPos + // 3.writeFileNum,writePos + Put([]byte) error + // ReadChan 读取文件数据 返回只读的chan 可用于多消费者并发读取 + ReadChan() <-chan []byte + PeekChan() <-chan []byte + // Close 等待loop结束, 正常关闭 并保存元数据 + Close() error + // Delete 等待loop结束, 直接关闭io流 + Delete() error + // Depth 未读消息积压量 + Depth() int64 + // Empty 清空消息, 删除文件 + Empty() error +} + +// diskQueue 基于文件系统的先进先出队列 +type diskQueue struct { + // 64bit atomic vars need to be first for proper alignment on 32bit platforms + + // run-time state (also persisted to disk) + // 运行时的数据保存, 也会保存到文件 + readPos int64 // 文件读取的位置 + writePos int64 // 文件写入的位置 + readFileNum int64 // 读取文件编号 + writeFileNum int64 // 写入文件编号 + depth int64 // 未读消息积压量 + + sync.RWMutex // 读写锁 + + // instantiation time metadata + name string // 队列实例名称 + dataPath string // 数据文件存储路径 + maxBytesPerFile int64 // 每个文件最大字节数 + maxBytesPerFileRead int64 + minMsgSize int32 // 最小消息长度 + maxMsgSize int32 // 最大消息长度 + syncEvery int64 // 刷盘频率设置 + syncTimeout time.Duration // 刷盘时间设置(单位秒) + exitFlag int32 // 退出标志位 + needSync bool // 是否需要同步 + + // 跟踪读取的位置 + nextReadPos int64 // 下一次读取的位置 + nextReadFileNum int64 // 下一次读取对应的文件编号 + + readFile *os.File // 读取的文件 + writeFile *os.File // 写入的文件 + reader *bufio.Reader // 缓冲读取 + writeBuf bytes.Buffer // 缓冲写入 + + // 通过 ReadChan() 方法对外暴露 + readChan chan []byte // 读取的数据,可以多消费者进行通信消费 + + // 通过 PeekChan() 方法对外暴露 + peekChan chan []byte // 探查数据 + + // 内部管道 + depthChan chan int64 + writeChan chan []byte // 写入通道 + writeResponseChan chan error // 写入结果反馈通道 + emptyChan chan int // 清空队列通道 + emptyResponseChan chan error // 清空反馈通道 + exitChan chan int // 结束信号通道 + exitSyncChan chan int // 退出同步通道 + logf AppLogFunc // 日志记录封装 +} + +// NewDiskQueue 实例化一个diskQueue,从文件系统中检索元数据并启动预计程序 +func NewDiskQueue(name, dataPath string, maxBytesPerFile int64, minMsgSize, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) IDiskQueue { + d := diskQueue{ + name: name, + dataPath: dataPath, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + syncEvery: syncEvery, + syncTimeout: syncTimeout, + readChan: make(chan []byte), + peekChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + logf: logf, + } + // 从磁盘加载元数据 + err := d.retrieveMetaData() + if err != nil && !os.IsNotExist(err) { + d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) + } + go d.ioLoop() + return &d +} + +// Depth 获取消息深度(条数) +func (d *diskQueue) Depth() int64 { + depth, ok := <-d.depthChan + if !ok { + // ioLoop exited + depth = d.depth + } + return depth +} + +// ReadChan 返回一个用于读取字节型数据的通道 +func (d *diskQueue) ReadChan() <-chan []byte { + return d.readChan +} + +// PeekChan 返回一个用于探测字数据的通道,不会对影响消息队列状态 +func (d *diskQueue) PeekChan() <-chan []byte { + return d.peekChan +} + +// Put 写入字节型数据到队列中 +func (d *diskQueue) Put(data []byte) error { + d.RLock() + defer d.RUnlock() + + if d.exitFlag == 1 { + return errors.New("exiting") + } + + d.writeChan <- data + return <-d.writeResponseChan +} + +// Close cleans up the queue and persists metadata +// 清理队列并持久化元数据 +func (d *diskQueue) Close() error { + err := d.exit(false) + if err != nil { + return err + } + return d.sync() +} + +func (d *diskQueue) Delete() error { + return d.exit(true) +} + +func (d *diskQueue) exit(deleted bool) error { + d.Lock() + defer d.Unlock() + + d.exitFlag = 1 + + if deleted { + d.logf(INFO, "DISKQUEUE(%s): deleting", d.name) + } else { + d.logf(INFO, "DISKQUEUE(%s): closing", d.name) + } + // 关闭退出通道 + close(d.exitChan) + + // 确保ioLoop已经退出 + <-d.exitSyncChan + + // 关闭文件读写 + close(d.depthChan) + if d.readFile != nil { + d.readFile.Close() + d.readFile = nil + } + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + + return nil +} + +// Empty destructively clears out any pending data in the queue +// by fast forwarding read positions and removing intermediate files +// 破坏性的清空数据,删除文件 +func (d *diskQueue) Empty() error { + d.RLock() + defer d.RUnlock() + + if d.exitFlag == 1 { + return errors.New("exiting") + } + + d.logf(INFO, "DISKQUEUE(%s): emptying", d.name) + + d.emptyChan <- 1 + return <-d.emptyResponseChan +} + +// 删除所有文件 +func (d *diskQueue) deleteAllFiles() error { + err := d.skipToNextRWFile() + + innerErr := os.Remove(d.metaDataFileName()) + if innerErr != nil && !os.IsNotExist(innerErr) { + d.logf(ERROR, "DISKQUEUE(%s) failed to remove metadata file - %s", d.name, innerErr) + return innerErr + } + + return err +} + +// 跳到下一个读写文件 +func (d *diskQueue) skipToNextRWFile() error { + var err error + + if d.readFile != nil { + d.readFile.Close() + d.readFile = nil + } + + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + + for i := d.readFileNum; i <= d.writeFileNum; i++ { + fileName := d.fileName(i) + innerErr := os.Remove(fileName) + if innerErr != nil && !os.IsNotExist(innerErr) { + d.logf(ERROR, "DISKQUEUE(%s) failed to remove data file - %s", d.name, innerErr) + err = innerErr + } + } + + d.writeFileNum++ + d.writePos = 0 + d.readFileNum = d.writeFileNum + d.readPos = 0 + d.nextReadFileNum = d.writeFileNum + d.nextReadPos = 0 + d.depth = 0 + + return err +} + +func (d *diskQueue) readOne() ([]byte, error) { + var err error + var msgSize int32 + + if d.readFile == nil { + curFileName := d.fileName(d.readFileNum) + d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600) + if err != nil { + return nil, err + } + + d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName) + + if d.readPos > 0 { + // 设置读取偏移 + _, err = d.readFile.Seek(d.readPos, 0) + if err != nil { + d.readFile.Close() + d.readFile = nil + return nil, err + } + } + + // for "complete" files (i.e. not the "current" file), maxBytesPerFileRead + // should be initialized to the file's size, or default to maxBytesPerFile + // maxBytesPerFileRead应该初始化为文件的大小,或者默认为maxBytesPerFile + d.maxBytesPerFileRead = d.maxBytesPerFile + if d.readFileNum < d.writeFileNum { + stat, err := d.readFile.Stat() + if err == nil { + d.maxBytesPerFileRead = stat.Size() + } + } + + d.reader = bufio.NewReader(d.readFile) + } + + err = binary.Read(d.reader, binary.BigEndian, &msgSize) + if err != nil { + d.readFile.Close() + d.readFile = nil + return nil, err + } + + if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { + // this file is corrupt and we have no reasonable guarantee on + // where a new message should begin + // 文件损坏,无法读取 + d.readFile.Close() + d.readFile = nil + return nil, fmt.Errorf("invalid message read size (%d)", msgSize) + } + + readBuf := make([]byte, msgSize) + _, err = io.ReadFull(d.reader, readBuf) + if err != nil { + d.readFile.Close() + d.readFile = nil + return nil, err + } + + totalBytes := int64(4 + msgSize) + + // we only advance next* because we have not yet sent this to consumers + // (where readFileNum, readPos will actually be advanced) + d.nextReadPos = d.readPos + totalBytes + d.nextReadFileNum = d.readFileNum + + // we only consider rotating if we're reading a "complete" file + // and since we cannot know the size at which it was rotated, we + // rely on maxBytesPerFileRead rather than maxBytesPerFile + // if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead { + if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead { + if d.readFile != nil { + d.readFile.Close() + d.readFile = nil + } + + d.nextReadFileNum++ + d.nextReadPos = 0 + } + + return readBuf, nil +} + +// writeOne performs a low level filesystem write for a single []byte +// while advancing write positions and rolling files, if necessary +func (d *diskQueue) writeOne(data []byte) error { + var err error + + dataLen := int32(len(data)) + totalBytes := int64(4 + dataLen) + + if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { + return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) + } + + // will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max + // 如果文件大小超过了设置的最大值,关闭当前写入文件 + if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { + if d.readFileNum == d.writeFileNum { + d.maxBytesPerFileRead = d.writePos + } + + d.writeFileNum++ + d.writePos = 0 + + // sync every time we start writing to a new file + err = d.sync() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) + } + + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } + + if d.writeFile == nil { + curFileName := d.fileName(d.writeFileNum) + d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + + d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) + + if d.writePos > 0 { + _, err = d.writeFile.Seek(d.writePos, 0) + if err != nil { + d.writeFile.Close() + d.writeFile = nil + return err + } + } + } + + d.writeBuf.Reset() + // 缓冲区写入二进制数据 + err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) + if err != nil { + return err + } + + _, err = d.writeBuf.Write(data) + if err != nil { + return err + } + + // only write to the file once + // 写入到文件 + _, err = d.writeFile.Write(d.writeBuf.Bytes()) + if err != nil { + d.writeFile.Close() + d.writeFile = nil + return err + } + + d.writePos += totalBytes + d.depth += 1 + + return err +} + +// sync fsyncs the current writeFile and persists metadata +// 刷盘,持久化数据 +func (d *diskQueue) sync() error { + if d.writeFile != nil { + // 数据写入磁盘 + err := d.writeFile.Sync() + if err != nil { + d.writeFile.Close() + d.writeFile = nil + return err + } + } + + // 持久化元数据 + err := d.persistMetaData() + if err != nil { + return err + } + + d.needSync = false + return nil +} + +// retrieveMetaData initializes state from the filesystem +// 从本地文件取回元数据 +func (d *diskQueue) retrieveMetaData() error { + var f *os.File + var err error + + // 存储路径.diskqueue.meta.dat + fileName := d.metaDataFileName() + f, err = os.OpenFile(fileName, os.O_RDONLY, 0600) + if err != nil { + return err + } + defer f.Close() + + var depth int64 + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &depth, // 待读取消息数量 + &d.readFileNum, // 待读取文件编号 + &d.readPos, // 待读取文件位置 + &d.writeFileNum, // 待写入文件编号 + &d.writePos, // 待写入文件位置 + ) + if err != nil { + return err + } + + d.depth = depth + // 更新读取位置和文件编号 + d.nextReadFileNum = d.readFileNum + d.nextReadPos = d.readPos + + // 如果元数据在最后一次关闭系统时没有同步, + // 那么实际文件的大小实际上可能大于writePos写入位置, + // 在这种情况下,最安全的做法是跳到下一个文件进行写操作, + // 并让读取器从磁盘队列中的消息中尽可能地抢救出元数据(可能也是过时的readPos)之外的内容 + fileName = d.fileName(d.writeFileNum) + fileInfo, err := os.Stat(fileName) + if err != nil { + return err + } + fileSize := fileInfo.Size() + if d.writePos < fileSize { + d.logf(WARN, "DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file", d.name, fileName, d.writePos, fileSize) + d.writeFileNum += 1 + d.writePos = 0 + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } + return nil +} + +// persistMetaData atomically writes state to the filesystem +// 同步元数据到本地文件 +func (d *diskQueue) persistMetaData() error { + var f *os.File + var err error + // metaDat 临时文件 + fileName := d.metaDataFileName() + tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) + + // 写入到临时文件 + f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + + _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, d.readFileNum, d.readPos, d.writeFileNum, d.writePos) + if err != nil { + f.Close() + return err + } + f.Sync() + f.Close() + // 成功往临时文件写入数据, 在进行替换源文件 + // atomically rename + return os.Rename(tmpFileName, fileName) +} + +// 获取元数据文件名称 +func (d *diskQueue) metaDataFileName() string { + return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name) +} + +// 获取文件名称 +func (d *diskQueue) fileName(fileNum int64) string { + return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) +} + +func (d *diskQueue) checkTailCorruption(depth int64) { + if d.readFileNum < d.writeFileNum || d.readPos < d.writePos { + return + } + + // we've reached the end of the diskqueue + // if depth isn't 0 something went wrong + if depth != 0 { + if depth < 0 { + d.logf(ERROR, "DISKQUEUE(%s) negative depth at tail (%d), metadata corruption, resetting 0...", d.name, depth) + } else if depth > 0 { + d.logf(ERROR, "DISKQUEUE(%s) positive depth at tail (%d), data loss, resetting 0...", d.name, depth) + } + // 强制设置为0 + d.depth = 0 + d.needSync = true + } + + if d.readFileNum != d.writeFileNum || d.readPos != d.writePos { + if d.readFileNum > d.writeFileNum { + d.logf(ERROR, "DISKQUEUE(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readFileNum, d.writeFileNum) + } + if d.readPos > d.writePos { + d.logf(ERROR, "DISKQUEUE(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readPos, d.writePos) + } + + // 强制跳到下一个文件 + d.skipToNextRWFile() + d.needSync = true + } +} + +func (d *diskQueue) moveForward() { + oldReadFileNum := d.readFileNum + d.readFileNum = d.nextReadFileNum + d.readPos = d.nextReadPos + d.depth -= 1 + + //see if we need to clean up the old file + if oldReadFileNum != d.nextReadFileNum { + // sync every time we start reading from a new file + // 每次读取新文件时同步 + d.needSync = true + + fileName := d.fileName(oldReadFileNum) + if d.writeFile != nil && d.writeFile.Name() == fileName { + d.writeFile.Close() + d.writeFile = nil + } + err := os.Remove(fileName) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fileName, err) + } + } + d.checkTailCorruption(d.depth) +} + +func (d *diskQueue) handleReadEOFError() { + fileName := d.fileName(d.readFileNum) + err := os.Remove(fileName) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fileName, err) + } + d.readFileNum++ + d.readPos = 0 + d.nextReadFileNum++ + d.nextReadPos = 0 +} + +// 处理读取错误 +func (d *diskQueue) handleReadError() { + // 跳转到下一个读取文件并重命名当前(损坏的)文件 + if d.readFileNum == d.writeFileNum { + // 如果你不能正确地从当前的写文件中读取,那就应该跳过当前的文件 + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + d.writeFileNum++ + d.writePos = 0 + } + badFileName := d.fileName(d.readFileNum) + badRenameFilename := badFileName + ".bad" + d.logf(WARN, "DISKQUEUE(%s) jump to next file and saving bad file as %s", d.name, badRenameFilename) + err := os.Rename(badFileName, badRenameFilename) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to rename bad diskqueue file %s to %s", d.name, badFileName, badRenameFilename) + } + d.readFileNum++ + d.readPos = 0 + d.nextReadFileNum = d.readFileNum + d.nextReadPos = 0 + + // 重大状态变更,在下一次迭代中安排同步 + d.needSync = true + + d.checkTailCorruption(d.depth) +} + +// ioLoop 提供了暴露go通道的后端(通过ReadChan()),以支持多个并发队列消费者 +// 它的工作原理是基于队列是否有数据可读和阻塞,直到数据通过适当的go通道读取或写入 +// 简单来讲,这也意味着我们正在异步地从文件系统读取数据 +func (d *diskQueue) ioLoop() { + var ( + dataRead []byte + err error + count int64 + r chan []byte + p chan []byte + ) + // 设置定时器 + syncTicker := time.NewTicker(d.syncTimeout) + + for { + // 若到达刷盘频次,标记等待刷盘 + if count == d.syncEvery { + d.needSync = true + } + + if d.needSync { + err = d.sync() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) + } + count = 0 + } + // 有可读数据,并且当前读chan的数据已经被读走,则读取下一条数据 + //if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { + if d.readFileNum < d.writeFileNum || (d.readFileNum == d.writeFileNum && d.readPos < d.writePos) { + if d.nextReadPos == d.readPos { + dataRead, err = d.readOne() + if err != nil { + if io.EOF == err { + d.logf(WARN, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) + // 读取了文件未尾,同时进行读写时可能会出现这个问题 + d.handleReadEOFError() + } else { + d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) + d.handleReadError() + } + continue + } + } + r = d.readChan + p = d.peekChan + } else { + // 如果无可读数据,那么设置 r 为nil, 防止将dataRead数据重复传入readChan中 + r = nil + p = nil + } + + // Go通道规范规定,跳过select中的nil通道操作(读或写),只有在有数据要读时,我们才将r设置为d.readChan + select { + // peekChan 不需要改变状态 + case p <- dataRead: + // readChan 需要改变状态 + case r <- dataRead: + count++ + // 如果读readChan成功,则会修改读的偏移 + d.moveForward() + case d.depthChan <- d.depth: + case <-d.emptyChan: + d.emptyResponseChan <- d.deleteAllFiles() + count = 0 + case dataWrite := <-d.writeChan: + count++ + d.writeResponseChan <- d.writeOne(dataWrite) + case <-syncTicker.C: + // 到刷盘时间,则修改needSync = true + if count == 0 { + // 没有活动时不要刷盘 + continue + } + d.needSync = true + case <-d.exitChan: + // 退出 + goto exit + } + } + +exit: + d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name) + syncTicker.Stop() + d.exitSyncChan <- 1 +} diff --git a/internal/app/mqueue/driver/diskqueue_test.go b/internal/app/mqueue/driver/diskqueue_test.go new file mode 100644 index 0000000..fe04928 --- /dev/null +++ b/internal/app/mqueue/driver/diskqueue_test.go @@ -0,0 +1,826 @@ +package disk + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "reflect" + "runtime" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" +) + +func Equal(t *testing.T, expected, actual interface{}) { + if !reflect.DeepEqual(expected, actual) { + _, file, line, _ := runtime.Caller(1) + t.Logf("\033[31m%s:%d:\n\n\t %#v (expected)\n\n\t!= %#v (actual)\033[39m\n\n", + filepath.Base(file), line, expected, actual) + t.FailNow() + } +} + +func NotEqual(t *testing.T, expected, actual interface{}) { + if reflect.DeepEqual(expected, actual) { + _, file, line, _ := runtime.Caller(1) + t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n", + filepath.Base(file), line, expected, actual) + t.FailNow() + } +} + +func Nil(t *testing.T, object interface{}) { + if !isNil(object) { + _, file, line, _ := runtime.Caller(1) + t.Logf("\033[31m%s:%d:\n\n\t (expected)\n\n\t!= %#v (actual)\033[39m\n\n", + filepath.Base(file), line, object) + t.FailNow() + } +} + +func NotNil(t *testing.T, object interface{}) { + if isNil(object) { + _, file, line, _ := runtime.Caller(1) + t.Logf("\033[31m%s:%d:\n\n\tExpected value not to be \033[39m\n\n", + filepath.Base(file), line) + t.FailNow() + } +} + +func isNil(object interface{}) bool { + if object == nil { + return true + } + + value := reflect.ValueOf(object) + kind := value.Kind() + if kind >= reflect.Chan && kind <= reflect.Slice && value.IsNil() { + return true + } + + return false +} + +type tbLog interface { + Log(...interface{}) +} + +func NewTestLogger(tbl tbLog) AppLogFunc { + return func(lvl LogLevel, f string, args ...interface{}) { + tbl.Log(fmt.Sprintf(lvl.String()+": "+f, args...)) + } +} + +func TestDiskQueue(t *testing.T) { + l := NewTestLogger(t) + + dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewDiskQueue(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + msg := []byte("test") + err = dq.Put(msg) + Nil(t, err) + Equal(t, int64(1), dq.Depth()) + + msgOut := <-dq.ReadChan() + Equal(t, msg, msgOut) +} + +func TestDiskQueueRoll(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} + //ml := int64(len(msg)) + dq := NewDiskQueue(dqName, tmpDir, 256144, 0, 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + num := 100000 + go func() { + for i := 0; i < num; i++ { + dq.Put(msg) + //Nil(t, err) + //Equal(t, int64(i+1), dq.Depth()) + } + }() + + //Equal(t, int64(1), dq.(*diskQueue).writeFileNum) + //Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + rnum := 0 + go func() { + for i := num; i > 0; i-- { + <-dq.ReadChan() + rnum++ + //Equal(t, int64(i-1), dq.Depth()) + } + }() + //for i := num; i > 0; i-- { + // Equal(t, msg, <-dq.ReadChan()) + // Equal(t, int64(i-1), dq.Depth()) + //} + time.Sleep(5 * time.Second) + fmt.Println(rnum) +} + +func TestDiskQueuePeek(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := bytes.Repeat([]byte{0}, 10) + ml := int64(len(msg)) + dq := NewDiskQueue(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + t.Run("roll", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("peek-read", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("read-peek", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 1; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + +} + +func assertFileNotExist(t *testing.T, fn string) { + f, err := os.OpenFile(fn, os.O_RDONLY, 0600) + Equal(t, (*os.File)(nil), f) + Equal(t, true, os.IsNotExist(err)) +} + +func TestDiskQueueEmpty(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := bytes.Repeat([]byte{0}, 10) + dq := NewDiskQueue(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + for i := 0; i < 100; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 0; i < 3; i++ { + <-dq.ReadChan() + } + + for { + if dq.Depth() == 97 { + break + } + time.Sleep(50 * time.Millisecond) + } + Equal(t, int64(97), dq.Depth()) + + numFiles := dq.(*diskQueue).writeFileNum + dq.Empty() + + assertFileNotExist(t, dq.(*diskQueue).metaDataFileName()) + for i := int64(0); i <= numFiles; i++ { + assertFileNotExist(t, dq.(*diskQueue).fileName(i)) + } + Equal(t, int64(0), dq.Depth()) + Equal(t, dq.(*diskQueue).writeFileNum, dq.(*diskQueue).readFileNum) + Equal(t, dq.(*diskQueue).writePos, dq.(*diskQueue).readPos) + Equal(t, dq.(*diskQueue).readPos, dq.(*diskQueue).nextReadPos) + Equal(t, dq.(*diskQueue).readFileNum, dq.(*diskQueue).nextReadFileNum) + + for i := 0; i < 100; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 0; i < 100; i++ { + <-dq.ReadChan() + } + + for { + if dq.Depth() == 0 { + break + } + time.Sleep(50 * time.Millisecond) + } + + Equal(t, int64(0), dq.Depth()) + Equal(t, dq.(*diskQueue).writeFileNum, dq.(*diskQueue).readFileNum) + Equal(t, dq.(*diskQueue).writePos, dq.(*diskQueue).readPos) + Equal(t, dq.(*diskQueue).readPos, dq.(*diskQueue).nextReadPos) +} + +func TestDiskQueueCorruption(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + // require a non-zero message length for the corrupt (len 0) test below + dq := NewDiskQueue(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l) + defer dq.Close() + + msg := make([]byte, 120) // 124 bytes per message, 8 messages (992 bytes) per file + msg[0] = 91 + msg[62] = 4 + msg[119] = 211 + + for i := 0; i < 25; i++ { + dq.Put(msg) + } + + Equal(t, int64(25), dq.Depth()) + + // corrupt the 2nd file + dqFn := dq.(*diskQueue).fileName(1) + os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted + + for i := 0; i < 19; i++ { // 1 message leftover in 4th file + Equal(t, msg, <-dq.ReadChan()) + } + + // corrupt the 4th (current) file + dqFn = dq.(*diskQueue).fileName(3) + os.Truncate(dqFn, 100) + + dq.Put(msg) // in 5th file + + Equal(t, msg, <-dq.ReadChan()) + + // write a corrupt (len 0) message at the 5th (current) file + dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0}) + + // force a new 6th file - put into 5th, then readOne errors, then put into 6th + dq.Put(msg) + dq.Put(msg) + + Equal(t, msg, <-dq.ReadChan()) + + dq.Put(msg) + dq.Put(msg) + // corrupt the last file + dqFn = dq.(*diskQueue).fileName(5) + os.Truncate(dqFn, 100) + + Equal(t, int64(2), dq.Depth()) + + // return one message and try reading again from corrupted file + <-dq.ReadChan() + + // give diskqueue time to handle read error + time.Sleep(50 * time.Millisecond) + + // the last log file is now considered corrupted leaving no more log messages + Equal(t, int64(0), dq.Depth()) +} + +type md struct { + depth int64 + readFileNum int64 + writeFileNum int64 + readPos int64 + writePos int64 +} + +func readMetaDataFile(fileName string, retried int) md { + f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) + if err != nil { + // provide a simple retry that results in up to + // another 500ms for the file to be written. + if retried < 9 { + retried++ + time.Sleep(50 * time.Millisecond) + return readMetaDataFile(fileName, retried) + } + panic(err) + } + defer f.Close() + + var ret md + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readPos, + &ret.writeFileNum, &ret.writePos) + if err != nil { + panic(err) + } + return ret +} + +func TestDiskQueueSyncAfterRead(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewDiskQueue(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msg := make([]byte, 1000) + dq.Put(msg) + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readPos == 0 && + d.writePos == 1004 { + // success + goto next + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +next: + dq.Put(msg) + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readPos == 1004 && + d.writePos == 2008 { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func TestDiskQueueTorture(t *testing.T) { + var wg sync.WaitGroup + + l := NewTestLogger(t) + dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewDiskQueue(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l) + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + msg := []byte("aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff") + + numWriters := 4 + numReaders := 4 + readExitChan := make(chan int) + writeExitChan := make(chan int) + + var depth int64 + for i := 0; i < numWriters; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + time.Sleep(100000 * time.Nanosecond) + select { + case <-writeExitChan: + return + default: + err := dq.Put(msg) + if err == nil { + atomic.AddInt64(&depth, 1) + } + } + } + }() + } + + time.Sleep(1 * time.Second) + + dq.Close() + + t.Logf("closing writeExitChan") + close(writeExitChan) + wg.Wait() + + t.Logf("restarting diskqueue") + + dq = NewDiskQueue(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, depth, dq.Depth()) + + var read int64 + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + time.Sleep(100000 * time.Nanosecond) + select { + case m := <-dq.ReadChan(): + Equal(t, m, msg) + atomic.AddInt64(&read, 1) + case <-readExitChan: + return + } + } + }() + } + + t.Logf("waiting for depth 0") + for { + if dq.Depth() == 0 { + break + } + time.Sleep(50 * time.Millisecond) + } + + t.Logf("closing readExitChan") + close(readExitChan) + wg.Wait() + + Equal(t, depth, read) +} + +func TestDiskQueueResize(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_resize" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + ml := int64(len(msg)) + dq := NewDiskQueue(dqName, tmpDir, 8*(ml+4), int32(ml), 1<<10, 2500, time.Second, l) + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + for i := 0; i < 9; i++ { + msg[0] = byte(i) + err := dq.Put(msg) + Nil(t, err) + } + Equal(t, int64(1), dq.(*diskQueue).writeFileNum) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(9), dq.Depth()) + + dq.Close() + dq = NewDiskQueue(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l) + + for i := 0; i < 10; i++ { + msg[0] = byte(20 + i) + err := dq.Put(msg) + Nil(t, err) + } + Equal(t, int64(2), dq.(*diskQueue).writeFileNum) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(19), dq.Depth()) + + for i := 0; i < 9; i++ { + msg[0] = byte(i) + Equal(t, msg, <-dq.ReadChan()) + } + for i := 0; i < 10; i++ { + msg[0] = byte(20 + i) + Equal(t, msg, <-dq.ReadChan()) + } + Equal(t, int64(0), dq.Depth()) + dq.Close() + + // make sure there aren't "bad" files due to read logic errors + files, err := filepath.Glob(filepath.Join(tmpDir, dqName+"*.bad")) + Nil(t, err) + // empty files slice is actually nil, length check is less confusing + if len(files) > 0 { + Equal(t, []string{}, files) + } +} + +func BenchmarkDiskQueuePut16(b *testing.B) { + benchmarkDiskQueuePut(16, b) +} +func BenchmarkDiskQueuePut64(b *testing.B) { + benchmarkDiskQueuePut(64, b) +} +func BenchmarkDiskQueuePut256(b *testing.B) { + benchmarkDiskQueuePut(256, b) +} +func BenchmarkDiskQueuePut1024(b *testing.B) { + benchmarkDiskQueuePut(1024, b) +} +func BenchmarkDiskQueuePut4096(b *testing.B) { + benchmarkDiskQueuePut(4096, b) +} +func BenchmarkDiskQueuePut16384(b *testing.B) { + benchmarkDiskQueuePut(16384, b) +} +func BenchmarkDiskQueuePut65536(b *testing.B) { + benchmarkDiskQueuePut(65536, b) +} +func BenchmarkDiskQueuePut262144(b *testing.B) { + benchmarkDiskQueuePut(262144, b) +} +func BenchmarkDiskQueuePut1048576(b *testing.B) { + benchmarkDiskQueuePut(1048576, b) +} +func benchmarkDiskQueuePut(size int64, b *testing.B) { + b.StopTimer() + l := NewTestLogger(b) + dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewDiskQueue(dqName, tmpDir, 1024768*100, 0, 1<<20, 2500, 2*time.Second, l) + defer dq.Close() + b.SetBytes(size) + data := make([]byte, size) + b.StartTimer() + + for i := 0; i < b.N; i++ { + err := dq.Put(data) + if err != nil { + panic(err) + } + } +} + +func BenchmarkDiskWrite16(b *testing.B) { + benchmarkDiskWrite(16, b) +} +func BenchmarkDiskWrite64(b *testing.B) { + benchmarkDiskWrite(64, b) +} +func BenchmarkDiskWrite256(b *testing.B) { + benchmarkDiskWrite(256, b) +} +func BenchmarkDiskWrite1024(b *testing.B) { + benchmarkDiskWrite(1024, b) +} +func BenchmarkDiskWrite4096(b *testing.B) { + benchmarkDiskWrite(4096, b) +} +func BenchmarkDiskWrite16384(b *testing.B) { + benchmarkDiskWrite(16384, b) +} +func BenchmarkDiskWrite65536(b *testing.B) { + benchmarkDiskWrite(65536, b) +} +func BenchmarkDiskWrite262144(b *testing.B) { + benchmarkDiskWrite(262144, b) +} +func BenchmarkDiskWrite1048576(b *testing.B) { + benchmarkDiskWrite(1048576, b) +} +func benchmarkDiskWrite(size int64, b *testing.B) { + b.StopTimer() + fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600) + b.SetBytes(size) + data := make([]byte, size) + b.StartTimer() + + for i := 0; i < b.N; i++ { + f.Write(data) + } + f.Sync() +} + +func BenchmarkDiskWriteBuffered16(b *testing.B) { + benchmarkDiskWriteBuffered(16, b) +} +func BenchmarkDiskWriteBuffered64(b *testing.B) { + benchmarkDiskWriteBuffered(64, b) +} +func BenchmarkDiskWriteBuffered256(b *testing.B) { + benchmarkDiskWriteBuffered(256, b) +} +func BenchmarkDiskWriteBuffered1024(b *testing.B) { + benchmarkDiskWriteBuffered(1024, b) +} +func BenchmarkDiskWriteBuffered4096(b *testing.B) { + benchmarkDiskWriteBuffered(4096, b) +} +func BenchmarkDiskWriteBuffered16384(b *testing.B) { + benchmarkDiskWriteBuffered(16384, b) +} +func BenchmarkDiskWriteBuffered65536(b *testing.B) { + benchmarkDiskWriteBuffered(65536, b) +} +func BenchmarkDiskWriteBuffered262144(b *testing.B) { + benchmarkDiskWriteBuffered(262144, b) +} +func BenchmarkDiskWriteBuffered1048576(b *testing.B) { + benchmarkDiskWriteBuffered(1048576, b) +} +func benchmarkDiskWriteBuffered(size int64, b *testing.B) { + b.StopTimer() + fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600) + b.SetBytes(size) + data := make([]byte, size) + w := bufio.NewWriterSize(f, 1024*4) + b.StartTimer() + + for i := 0; i < b.N; i++ { + w.Write(data) + if i%1024 == 0 { + w.Flush() + } + } + w.Flush() + f.Sync() +} + +// you might want to run this like +// $ go test -bench=DiskQueueGet -benchtime 0.1s +// too avoid doing too many iterations. +func BenchmarkDiskQueueGet16(b *testing.B) { + benchmarkDiskQueueGet(16, b) +} +func BenchmarkDiskQueueGet64(b *testing.B) { + benchmarkDiskQueueGet(64, b) +} +func BenchmarkDiskQueueGet256(b *testing.B) { + benchmarkDiskQueueGet(256, b) +} +func BenchmarkDiskQueueGet1024(b *testing.B) { + benchmarkDiskQueueGet(1024, b) +} +func BenchmarkDiskQueueGet4096(b *testing.B) { + benchmarkDiskQueueGet(4096, b) +} +func BenchmarkDiskQueueGet16384(b *testing.B) { + benchmarkDiskQueueGet(16384, b) +} +func BenchmarkDiskQueueGet65536(b *testing.B) { + benchmarkDiskQueueGet(65536, b) +} +func BenchmarkDiskQueueGet262144(b *testing.B) { + benchmarkDiskQueueGet(262144, b) +} +func BenchmarkDiskQueueGet1048576(b *testing.B) { + benchmarkDiskQueueGet(1048576, b) +} + +func benchmarkDiskQueueGet(size int64, b *testing.B) { + b.StopTimer() + l := NewTestLogger(b) + dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewDiskQueue(dqName, tmpDir, 1024768, 0, 1<<30, 2500, 2*time.Second, l) + defer dq.Close() + b.SetBytes(size) + data := make([]byte, size) + for i := 0; i < b.N; i++ { + dq.Put(data) + } + b.StartTimer() + + for i := 0; i < b.N; i++ { + <-dq.ReadChan() + } +} + +func TestDiskQueueRollAsync(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} + //ml := int64(len(msg)) + dq := NewDiskQueue(dqName, tmpDir, 1024576, 0, 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + num := 1000000 + go func() { + for i := 0; i < num; i++ { + err := dq.Put(msg) + Nil(t, err) + } + }() + go func() { + for i := 0; i < num; i++ { + Equal(t, msg, <-dq.ReadChan()) + } + }() + + //Equal(t, int64(1), dq.(*diskQueue).writeFileNum) + //Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + + //filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + // if strings.HasSuffix(path, ".bad") { + // t.FailNow() + // } + // + // return err + //}) + time.Sleep(30 * time.Second) +} diff --git a/internal/app/mqueue/logic/diskqueue/client.go b/internal/app/mqueue/logic/diskqueue/client.go new file mode 100644 index 0000000..00997b0 --- /dev/null +++ b/internal/app/mqueue/logic/diskqueue/client.go @@ -0,0 +1,248 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/7 14:54 + */ + +package diskqueue + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/grand" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + disk "github.com/tiger1103/gfast/v3/internal/app/mqueue/driver" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "io" + "os" + "sync" + "time" +) + +func logger(ctx context.Context) disk.AppLogFunc { + return func(lvl disk.LogLevel, f string, args ...interface{}) { + switch lvl { + case disk.FATAL: + g.Log("diskQueue").Fatal(ctx, fmt.Sprintf(f, args...)) + case disk.DEBUG: + g.Log("diskQueue").Debug(ctx, fmt.Sprintf(f, args...)) + case disk.INFO: + g.Log("diskQueue").Info(ctx, fmt.Sprintf(f, args...)) + case disk.WARN: + g.Log("diskQueue").Warning(ctx, fmt.Sprintf(f, args...)) + case disk.ERROR: + g.Log("diskQueue").Error(ctx, fmt.Sprintf(f, args...)) + } + } +} + +type client struct { + name string + diskQueue disk.IDiskQueue + channelConsumers map[string][]*consumer + exitChan chan struct{} + logger disk.AppLogFunc + sync.Mutex +} + +// 磁盘消息队列实例 +var clientMap = make(map[string]*client) +var clientMutex sync.Mutex + +// registerDiskQueue 创建磁盘队列的实例 +func registerDiskQueue(topic string) error { + if _, ok := clientMap[topic]; ok { + return errors.New(fmt.Sprintf("实例 %s 已经存在", topic)) + } + var dc *model.DiskQueueConfig + err := g.Cfg().MustGet(context.TODO(), "mqueue.diskQueue").Struct(&dc) + if err != nil { + return err + } + // 数据文件夹不存在则创建 + if stat, err2 := os.Stat(dc.DataPath); stat == nil || os.IsNotExist(err2) { + err3 := os.MkdirAll(dc.DataPath, os.ModePerm) + if err3 != nil { + return err3 + } + } + + clientMap[topic] = &client{ + name: topic, + diskQueue: disk.NewDiskQueue(topic, dc.DataPath, dc.MaxBytesPerFile, dc.MinMsgSize, dc.MaxMsgSize, dc.SyncEvery, dc.SyncTimeout*time.Second, logger(context.TODO())), + channelConsumers: make(map[string][]*consumer), + exitChan: make(chan struct{}), + logger: logger(context.TODO()), + } + // 开始消息循环 + go clientMap[topic].start() + return nil +} + +// 根据提供的topic 返回一个已注册的diskQueue实例,如果实例不存在,则创建一个返回 +func getClient(topic string) (c *client, err error) { + clientMutex.Lock() + defer clientMutex.Unlock() + if _, ok := clientMap[topic]; !ok { + err = registerDiskQueue(topic) + if err != nil { + return nil, err + } + } + return clientMap[topic], nil +} + +// RegisterConsumer 注册消费者到对应频道 +func (c *client) RegisterConsumer(channel string, consumer *consumer) { + c.Lock() + defer c.Unlock() + c.channelConsumers[channel] = append(c.channelConsumers[channel], consumer) +} + +// RemoveConsumer 移除消费者 +func (c *client) RemoveConsumer(channel, id string) { + c.Lock() + c.Unlock() + for i, ch := range c.channelConsumers[channel] { + if ch.Id == id { + if len(c.channelConsumers[channel]) <= 1 { + delete(c.channelConsumers, channel) + // 用空字符清理一下start中的ReadChan()管道 + c.diskQueue.Put([]byte("")) + } else { + c.channelConsumers[channel] = append(c.channelConsumers[channel][:i], c.channelConsumers[channel][i+1:]...) + } + } + } +} + +// 将消息MQMessage转换为[]byte +func (c *client) messageToByte(m *model.MQMessage, w io.Writer) (int64, error) { + var buf [10]byte + var total int64 + + binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) + binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) + + n, err := w.Write(buf[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.ID[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.Body) + total += int64(n) + if err != nil { + return total, err + } + + return total, nil +} + +// Publish 生产者调用此方法进行消息发布 +func (c *client) Publish(body []byte) error { + var id model.MessageID + copy(id[:], grand.B(16)) + m := c.newMessage(id, body) + var b bytes.Buffer + total, err := c.messageToByte(m, &b) + if err != nil { + return err + } + if total == 0 { + return errors.New("发布信息失败,s.WriteTo 长度为 0") + } + return c.diskQueue.Put(b.Bytes()) +} + +// start 开始投递消息给消费者 +func (c *client) start() { + for { + if len(c.channelConsumers) > 0 { + select { + case m := <-c.diskQueue.ReadChan(): + if len(m) <= 0 { + break + } + message, err := c.decodeMessage(m) + if err != nil { + c.logger(disk.ERROR, err.Error()) + } + // 消息广播到所有频道 + for _, channel := range c.channelConsumers { + // 广播到当前频道下的所有消费者 + for _, ch := range channel { + err = ch.Handler(message) + if err != nil { + c.logger(disk.ERROR, err.Error()) + } + } + } + case <-c.exitChan: + return + } + } + } +} + +// 解码消息 +// message format: +// +// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... +// | (int64) || || (hex string encoded in ASCII) || (binary) +// | 8-byte || || 16-byte || N-byte +// ------------------------------------------------------------------------------------------... +// nanosecond timestamp ^^ message ID message body +// (uint16) +// 2-byte +// attempts +func (c *client) decodeMessage(b []byte) (*model.MQMessage, error) { + var msg model.MQMessage + + if len(b) < 10+consts.MsgIdLength { + return nil, errors.New("not enough data to decode valid message") + } + + msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) + msg.Attempts = binary.BigEndian.Uint16(b[8:10]) + copy(msg.ID[:], b[10:10+consts.MsgIdLength]) + msg.Body = b[10+consts.MsgIdLength:] + + return &msg, nil +} + +// newMessage 创建消息 +func (c *client) newMessage(id model.MessageID, body []byte) *model.MQMessage { + return &model.MQMessage{ + ID: id, + Body: body, + Timestamp: time.Now().UnixNano(), + } +} + +func CloseDiskQueueService() { + for _, v := range clientMap { + v.diskQueue.Close() + v.exitChan <- struct{}{} + } +} + +// Clear 清空所有磁盘文件,慎用 +func Clear() { + path := g.Cfg().MustGet(context.TODO(), "mqueue.diskQueue.dataPath").String() + err := os.RemoveAll(path) + if err != nil { + fmt.Println(err) + } +} diff --git a/internal/app/mqueue/logic/diskqueue/consumer.go b/internal/app/mqueue/logic/diskqueue/consumer.go new file mode 100644 index 0000000..231b91f --- /dev/null +++ b/internal/app/mqueue/logic/diskqueue/consumer.go @@ -0,0 +1,49 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/12 10:22 + */ + +package diskqueue + +import ( + "context" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/grand" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" +) + +type consumer struct { + Topic string + Channel string + Id string + Handler model.MQConsumerHandlerCallback +} + +// NewDiskConsumer 创建一个消费者 +func NewDiskConsumer(topic, channel string, handler model.MQConsumerHandlerCallback) (service.IConsumer, error) { + dqs, err := getClient(topic) + if err != nil { + return nil, err + } + id := "dqc_" + grand.S(16) + c := &consumer{ + Topic: topic, + Channel: channel, + Id: id, + Handler: handler, + } + dqs.RegisterConsumer(channel, c) + return c, nil +} + +func (s *consumer) CloseMqConsumer() { + dqs, err := getClient(s.Topic) + if err != nil { + g.Log("diskQueue").Error(context.TODO(), "执行 CloseMqConsumer 失败:"+err.Error()) + return + } + dqs.RemoveConsumer(s.Channel, s.Id) +} diff --git a/internal/app/mqueue/logic/diskqueue/producer.go b/internal/app/mqueue/logic/diskqueue/producer.go new file mode 100644 index 0000000..e001595 --- /dev/null +++ b/internal/app/mqueue/logic/diskqueue/producer.go @@ -0,0 +1,55 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/12 10:23 + */ + +package diskqueue + +import ( + "context" + "github.com/gogf/gf/v2/frame/g" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" +) + +type producer struct { + isClose bool +} + +func NewDiskQueueProducer() service.IProducer { + return &producer{} +} + +func (p *producer) Publish(topic string, body []byte) error { + dq, err := getClient(topic) + if err != nil { + return err + } + return dq.Publish(body) +} + +func (p *producer) PublishAsync(topic string, body []byte) error { + dq, err := getClient(topic) + go func() { + err = dq.Publish(body) + if err != nil { + g.Log("diskQueue").Error(context.TODO(), "diskQueue PublishAsync消息失败:"+err.Error()) + } + }() + return err +} + +func (p *producer) DelayPublish(topic string, body []byte, delay consts.MsgDelayLevel) error { + g.Log("diskQueue").Warning(context.TODO(), "diskQueue 不支持延时消息,将使用publish发送") + dq, err := getClient(topic) + if err != nil { + return err + } + return dq.Publish(body) +} + +func (p *producer) Close() { + p.isClose = true +} diff --git a/internal/app/mqueue/logic/logic.go b/internal/app/mqueue/logic/logic.go new file mode 100644 index 0000000..704deb0 --- /dev/null +++ b/internal/app/mqueue/logic/logic.go @@ -0,0 +1,11 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/28 10:34 + */ + +package logic + +// 启动消息队列服务 +import _ "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/mqueue" diff --git a/internal/app/mqueue/logic/mqueue/mqueue.go b/internal/app/mqueue/logic/mqueue/mqueue.go new file mode 100644 index 0000000..1bae653 --- /dev/null +++ b/internal/app/mqueue/logic/mqueue/mqueue.go @@ -0,0 +1,197 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/6/28 11:14 + */ + +package mqueue + +import ( + "context" + "errors" + "fmt" + "github.com/gogf/gf/v2/container/gpool" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/diskqueue" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/nsq" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/rocketmq" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +var ctx = context.TODO() +var mQueueMutex = sync.Mutex{} + +type mQueue struct { + producerPool *gpool.Pool + consumerInstanceMap map[string]service.IConsumer + logger glog.ILogger +} + +func init() { + isEnable := g.Cfg().MustGet(ctx, "mqueue.enable").Bool() + if !isEnable { + return + } + service.RegisterMQueue(New()) + g.Log().Info(ctx, "mqueue service is running") +} + +func New() service.IMQueue { + mq := &mQueue{ + producerPool: gpool.New(60*time.Second, func() (interface{}, error) { + //g.Log().Info(context.Background(), "创建新的producerPool 对象") + return createProducer() + }, func(i interface{}) { + i.(service.IProducer).Close() + i = nil + //g.Log().Warning(context.Background(), "producerPool 超时释放资源 ") + }), + consumerInstanceMap: make(map[string]service.IConsumer), + } + go func() { + // 监听程序退出命令,清理队列 + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, os.Kill, os.Interrupt) + for { + select { + case s := <-c: + if s == syscall.SIGINT || s == syscall.SIGTERM || s == syscall.SIGHUP || s == syscall.SIGQUIT || s == os.Interrupt || s == os.Kill { + fmt.Println("mqueue exiting...") + mq.Close() + closeWaitTime := g.Cfg().MustGet(ctx, "mqueue.closeWaitTime").Int64() + if closeWaitTime <= 0 { + closeWaitTime = 5 + } + time.Sleep(time.Duration(closeWaitTime) * time.Second) + os.Exit(0) + return + } + } + } + }() + return mq +} + +// CreateProducer 创建生产者 +func createProducer() (service.IProducer, error) { + mqDriver, err := g.Cfg().Get(context.TODO(), "mqueue.driver") + if err != nil { + return nil, err + } + var mqProducer service.IProducer + switch mqDriver.String() { + case "diskQueue": + mqProducer = diskqueue.NewDiskQueueProducer() + case "nsq": + mqProducer, err = nsq.NewNsqProducer() + case "rocketmq": + mqProducer, err = rocketmq.NewProducer() + default: + return nil, errors.New("没有找到消息队列驱动,请检查配置文件中的driver类型") + } + return mqProducer, nil +} + +// SendMsg 发送消息 +func (m *mQueue) SendMsg(msg *model.MQSendMsg) error { + // 从生产者对象池取一个 + pd, err := m.producerPool.Get() + if err != nil { + return err + } + if pd == nil { + return errors.New("从生产者对象池中获取生产者失败") + } + // 使用完放回到对象池 + defer func(producerPool *gpool.Pool, value interface{}) { + err2 := producerPool.Put(value) + if err2 != nil { + g.Log("mqueue").Error(ctx, "sendMsg defer producerPool put error:") + g.Log("mqueue").Error(ctx, err2.Error()) + } + }(m.producerPool, pd) + + switch msg.SendMethod { + case consts.SendMsgPublishAsync: + err = pd.(service.IProducer).PublishAsync(msg.Topic, msg.Body) + case consts.SendMsgDelay: + err = pd.(service.IProducer).DelayPublish(msg.Topic, msg.Body, msg.Delay) + default: + err = pd.(service.IProducer).Publish(msg.Topic, msg.Body) + } + return err +} + +// Subscribe 从指定的topic 和 channel 订阅消息,并使用回调函数来处理消息 +func (m *mQueue) Subscribe(topic, channel string, handler model.MQConsumerHandlerCallback) error { + mQueueMutex.Lock() + defer mQueueMutex.Unlock() + key := fmt.Sprintf("%s_%s", topic, channel) + // 一个频道尽量一个处理器以保证消息的原子性,多个应用消费同一主题需最好在handler里面自行处理错误 + if _, ok := m.consumerInstanceMap[key]; ok { + return errors.New(fmt.Sprintf("已经订阅过该主题 %s 和频道 %s", topic, channel)) + } + mqDriver, err := g.Cfg().Get(ctx, "mqueue.driver") + if err != nil { + return err + } + var mqConsumer service.IConsumer + switch mqDriver.String() { + case "diskQueue": + mqConsumer, err = diskqueue.NewDiskConsumer(topic, channel, handler) + case "nsq": + mqConsumer, err = nsq.NewNsqConsumer(topic, channel, handler) + case "rocketmq": + mqConsumer, err = rocketmq.NewPushConsumer(topic, channel, handler) + default: + return errors.New("没有找到消息队列驱动,请检查配置文件中的driver类型") + } + if err != nil { + return err + } + m.consumerInstanceMap[key] = mqConsumer + return err +} + +// Close 关闭消息队列 +func (m *mQueue) Close() { + // 清空生产者 + m.producerPool.Clear() + // 关闭消费者 + for key, c := range m.consumerInstanceMap { + c.CloseMqConsumer() + delete(m.consumerInstanceMap, key) + } + mqDriver, err := g.Cfg().Get(ctx, "mqueue.driver") + if err != nil { + g.Log().Error(ctx, "获取 mqueue.driver 失败") + return + } + switch mqDriver.String() { + case "diskQueue": + diskqueue.CloseDiskQueueService() + } +} + +// Unsubscribe 取消订阅主题 +func (m *mQueue) Unsubscribe(topic, channel string) error { + mQueueMutex.Lock() + defer mQueueMutex.Unlock() + key := fmt.Sprintf("%s_%s", topic, channel) + if _, ok := m.consumerInstanceMap[key]; ok { + m.consumerInstanceMap[key].CloseMqConsumer() + delete(m.consumerInstanceMap, key) + return nil + } else { + return errors.New("没有找到订阅信息") + } +} diff --git a/internal/app/mqueue/logic/nsq/consumer.go b/internal/app/mqueue/logic/nsq/consumer.go new file mode 100644 index 0000000..c7887cb --- /dev/null +++ b/internal/app/mqueue/logic/nsq/consumer.go @@ -0,0 +1,90 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/3 10:44 + */ + +package nsq + +import ( + "context" + "errors" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/nsqio/go-nsq" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" +) + +type nsqConsumer struct { + consumer *nsq.Consumer + nsqConfig *nsqConsumerConnConfig + Handler model.MQConsumerHandlerCallback +} + +type nsqConsumerConnConfig struct { + Addr string // NsqLookupd 服务器地址 + Port uint // 服务器端口号 + LocalAddr string // 本地IP地址 + AuthSecret string // 认证秘钥 + LookupdAuthorization bool // 是否开启认证 +} + +func (c *nsqConsumer) CloseMqConsumer() { + c.consumer.Stop() +} + +// HandleMessage 处理go_nsq转发的消息,返回nil则表式处理完成 +func (c *nsqConsumer) HandleMessage(m *nsq.Message) error { + // 空消息不用处理 + if len(m.Body) == 0 { + return nil + } + var id model.MessageID + copy(id[:], m.ID[:16]) + mqMsgRes := &model.MQMessage{ + ID: id, + Body: m.Body, + Timestamp: m.Timestamp, + } + return c.Handler(mqMsgRes) +} + +// NewNsqConsumer 创建NSQConsumer实例 +func NewNsqConsumer(topic, channel string, handler model.MQConsumerHandlerCallback) (service.IConsumer, error) { + addr := g.Cfg().MustGet(context.TODO(), "mqueue.nsq.address").String() + port := g.Cfg().MustGet(context.TODO(), "mqueue.nsq.consumer_port").Uint() + if addr == "" || port < 1 { + return nil, errors.New("nsq 配置读取错误") + } + + config := nsq.NewConfig() + consumer, err := nsq.NewConsumer(topic, channel, config) + + consumer.SetLoggerLevel(nsq.LogLevel(2)) + if err != nil { + return nil, err + } + nsqConsumerConfig := &nsqConsumerConnConfig{ + Addr: addr, + Port: port, + } + nsqC := &nsqConsumer{ + nsqConfig: nsqConsumerConfig, + consumer: consumer, + Handler: handler, + } + // 添加消息处理器 + consumer.AddHandler(nsqC) + err = consumer.ConnectToNSQLookupd(fmt.Sprintf("%s:%d", nsqConsumerConfig.Addr, nsqConsumerConfig.Port)) + if err != nil { + return nil, err + } + stats := consumer.Stats() + if stats.Connections <= 0 { + nsqC = nil + err = errors.New("未能连接到nsq lookup 服务器,请检查错误日志") + } + return nsqC, err +} diff --git a/internal/app/mqueue/logic/nsq/producer.go b/internal/app/mqueue/logic/nsq/producer.go new file mode 100644 index 0000000..3c26f14 --- /dev/null +++ b/internal/app/mqueue/logic/nsq/producer.go @@ -0,0 +1,67 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/3 10:44 + */ + +package nsq + +import ( + "context" + "errors" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/nsqio/go-nsq" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" + "time" +) + +type nsqProducer struct { + Producer *nsq.Producer +} + +// NewNsqProducer 创建操作NSQProducer的实例 +func NewNsqProducer() (service.IProducer, error) { + config := nsq.NewConfig() + address := g.Cfg().MustGet(context.TODO(), "mqueue.nsq.address").String() + port := g.Cfg().MustGet(context.TODO(), "mqueue.nsq.producer_port").Uint() + if address == "" || port <= 0 { + return nil, errors.New("配置文件有错误") + } + producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", address, port), config) + if err != nil { + return nil, err + } + return &nsqProducer{Producer: producer}, nil +} + +func (p *nsqProducer) Publish(topic string, body []byte) error { + return p.Producer.Publish(topic, body) +} + +func (p *nsqProducer) PublishAsync(topic string, body []byte) error { + doneChan := make(chan *nsq.ProducerTransaction) + go func() { + timeout := time.NewTicker(5 * time.Second) + select { + case res := <-doneChan: + if res.Error != nil { + g.Log("nsq").Error(context.TODO(), "publishAsync error doneChan 返回错误:"+res.Error.Error()) + } + case <-timeout.C: + g.Log("nsq").Error(context.TODO(), "PublishAsync error doneChan 返回超时") + } + + }() + return p.Producer.PublishAsync(topic, body, doneChan) +} + +func (p *nsqProducer) DelayPublish(topic string, body []byte, delay consts.MsgDelayLevel) error { + return p.Producer.DeferredPublish(topic, delay.Value(), body) +} + +func (p *nsqProducer) Close() { + p.Producer.Stop() +} diff --git a/internal/app/mqueue/logic/rocketmq/consumer.go b/internal/app/mqueue/logic/rocketmq/consumer.go new file mode 100644 index 0000000..7900dc3 --- /dev/null +++ b/internal/app/mqueue/logic/rocketmq/consumer.go @@ -0,0 +1,71 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/24 9:54 + */ + +package rocketmq + +import ( + "context" + "errors" + "github.com/apache/rocketmq-client-go/v2" + rmq_consumer "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/rlog" + "github.com/gogf/gf/v2/frame/g" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" +) + +type rocketmqConsumer struct { + topic string + pushConsumer rocketmq.PushConsumer +} + +// NewPushConsumer 创建MyRocketmq消费者实例 +func NewPushConsumer(topic, groupName string, handler model.MQConsumerHandlerCallback) (service.IConsumer, error) { + endPoint := g.Cfg().MustGet(context.TODO(), "mqueue.rocketmq.nameServers").Strings() + logLevel := g.Cfg().MustGet(context.TODO(), "mqueue.rocketmq.logLevel").String() + if len(endPoint) < 1 { + return nil, errors.New("配置文件不正确,获取mqueue.rocketmq.nameServers失败") + } + // 创建rocketmq实例 + rlog.SetLogLevel(logLevel) + rmqC, err := rocketmq.NewPushConsumer( + rmq_consumer.WithNameServer(endPoint), + rmq_consumer.WithGroupName(groupName), + ) + + if err != nil { + return nil, err + } + err = rmqC.Subscribe(topic, rmq_consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (rmq_consumer.ConsumeResult, error) { + for _, msg := range msgs { + var id model.MessageID + copy(id[:], msg.MsgId[:32]) + handler(&model.MQMessage{ + ID: id, + Body: msg.Body, + Attempts: 0, + Timestamp: msg.BornTimestamp, + }) + } + // 无论如何都返回成功,消费失败自行在处理器里处理 + return rmq_consumer.ConsumeSuccess, nil + }) + err = rmqC.Start() + return &rocketmqConsumer{pushConsumer: rmqC}, nil +} + +func (c *rocketmqConsumer) CloseMqConsumer() { + var err error + if c.pushConsumer != nil { + err = c.pushConsumer.Shutdown() + } + if err != nil { + g.Log("rocketmq").Error(context.TODO(), errors.New("rocketmq CloseMqConsumer 失败:")) + g.Log("rocketmq").Error(context.TODO(), err) + } +} diff --git a/internal/app/mqueue/logic/rocketmq/producer.go b/internal/app/mqueue/logic/rocketmq/producer.go new file mode 100644 index 0000000..e5e0489 --- /dev/null +++ b/internal/app/mqueue/logic/rocketmq/producer.go @@ -0,0 +1,87 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/24 9:54 + */ + +package rocketmq + +import ( + "context" + "errors" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + rmq_producer "github.com/apache/rocketmq-client-go/v2/producer" + "github.com/gogf/gf/v2/frame/g" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" +) + +type producer struct { + rocketmqProducer rocketmq.Producer +} + +// NewProducer 创建rocketmq生产者实例 +func NewProducer() (service.IProducer, error) { + endPoint := g.Cfg().MustGet(context.TODO(), "mqueue.rocketmq.nameServers") + retry := g.Cfg().MustGet(context.TODO(), "mqueue.rocketmq.retry").Int() + groupName := g.Cfg().MustGet(context.TODO(), "mqueue.channel").String() + rmqP, err := rocketmq.NewProducer(rmq_producer.WithNameServer(endPoint.Strings()), rmq_producer.WithRetry(retry), rmq_producer.WithGroupName(groupName)) + if err != nil { + return nil, err + } + if rmqP == nil { + return nil, errors.New("创建生产者失败") + } + err = rmqP.Start() + if err != nil { + return nil, err + } + return &producer{rocketmqProducer: rmqP}, nil +} + +func (p *producer) Publish(topic string, body []byte) error { + msg := &primitive.Message{ + Topic: topic, + Body: body, + } + _, err := p.rocketmqProducer.SendSync(context.TODO(), msg) + return err +} + +func (p *producer) PublishAsync(topic string, body []byte) error { + msg := &primitive.Message{ + Topic: topic, + Body: body, + } + err := p.rocketmqProducer.SendAsync(context.TODO(), func(ctx context.Context, result *primitive.SendResult, err error) { + if err != nil { + g.Log("rocketmq").Error(context.TODO(), errors.New("rocketmq PublishAsync 失败:")) + g.Log("rocketmq").Error(context.TODO(), err) + } + }, msg) + return err +} + +func (p *producer) DelayPublish(topic string, body []byte, delay consts.MsgDelayLevel) error { + msg := &primitive.Message{ + Topic: topic, + Body: body, + } + if delay < 1 || delay > 18 { + delay = 1 + } + msg.WithDelayTimeLevel(int(delay)) + _, err := p.rocketmqProducer.SendSync(context.TODO(), msg) + return err +} + +func (p *producer) Close() { + if p.rocketmqProducer != nil { + err := p.rocketmqProducer.Shutdown() + if err != nil { + g.Log("rocketmq").Error(context.TODO(), err) + } + } +} diff --git a/internal/app/mqueue/model/diskqueue.go b/internal/app/mqueue/model/diskqueue.go new file mode 100644 index 0000000..28a43ac --- /dev/null +++ b/internal/app/mqueue/model/diskqueue.go @@ -0,0 +1,19 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/7 15:36 + */ + +package model + +import "time" + +type DiskQueueConfig struct { + DataPath string // 磁盘存储路径 + MaxBytesPerFile int64 // 单个单件最大字节数 + SyncEvery int64 // 多少次读写后刷盘 + SyncTimeout time.Duration // 刷盘间隔(秒) + MaxMsgSize int32 // 最大消息字节数 + MinMsgSize int32 // 最小消息字节数 +} diff --git a/internal/app/mqueue/model/mqueue.go b/internal/app/mqueue/model/mqueue.go new file mode 100644 index 0000000..759c91f --- /dev/null +++ b/internal/app/mqueue/model/mqueue.go @@ -0,0 +1,37 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/3 8:57 + */ + +package model + +import ( + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" +) + +type MQConsumerHandlerCallback func(m *MQMessage) error + +type MQSendMsg struct { + // 主题 + Topic string + // 消息体 + Body []byte + // 发到频道 + Channel string + // 消息发送类型 + SendMethod consts.SendMsgMethod + // delayPublish 使用此参数,用于设置延迟消息等级,等级只能使用整数1-18 对应时间: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + Delay consts.MsgDelayLevel +} + +// MQMessage 消息返回结构体 +type MQMessage struct { + ID MessageID + Body []byte + Attempts uint16 + Timestamp int64 +} + +type MessageID [consts.MsgIdLength]byte diff --git a/internal/app/mqueue/mqueue_test.go b/internal/app/mqueue/mqueue_test.go new file mode 100644 index 0000000..514d3a6 --- /dev/null +++ b/internal/app/mqueue/mqueue_test.go @@ -0,0 +1,168 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/7 11:26 + */ + +package mqueue + +import ( + "bytes" + "fmt" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/diskqueue" + _ "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/mqueue" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/service" + "sync" + "testing" + "time" +) + +var mBody = []byte("gfast-mqueue 测试消息队列内容") +var wg = sync.WaitGroup{} + +const ( + TOPIC = "producer_test6" + SENDCOUNT = 10 + // 最好等待10秒来刷盘或更新rocketmq消费偏移 + TIMEOUT = 10 +) + +// channel 在rocketmq中可用,delay 1-18 对应时间: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h +func producer(topic string, delay consts.MsgDelayLevel, timeout time.Duration, t *testing.T) { + mq := service.MQueue() + if mq == nil { + t.Error("get mQueue failed") + return + } + fmt.Println("start send msg") + t1 := time.Now() + pCount := 0 + for i := 0; i < SENDCOUNT; i++ { + msg := &model.MQSendMsg{ + Topic: topic, + Body: mBody, + } + if delay > 0 { + msg.Delay = delay + msg.SendMethod = consts.SendMsgDelay + } + err := mq.SendMsg(msg) + if err != nil { + t.Error(err) + return + } + pCount++ + } + fmt.Println(fmt.Sprintf("发送数据 %d 条,耗时:%f", pCount, time.Since(t1).Seconds())) + // 如果是diskqueue至少等待一次刷盘时,避免未刷就退出主线程 + time.Sleep(timeout * time.Second) +} + +func consumer(topic, channel string, timeout time.Duration, t *testing.T) { + fmt.Println(fmt.Sprintf("消费者 %s %s 启动", topic, channel)) + cChan := make(chan []byte) + cCount := 0 + go func() { + for { + select { + case <-cChan: + cCount++ + } + } + }() + mq := service.MQueue() + if mq == nil { + t.Error("get mQueue failed") + } + // 订阅消息 + err := mq.Subscribe(topic, channel, func(m *model.MQMessage) error { + if !bytes.Equal(mBody, m.Body) { + fmt.Println(fmt.Sprintf("消费者1第 %d 条数据错误", cCount)) + } + cChan <- m.Body + //fmt.Println(m.ID, m.Timestamp) + return nil + }) + if err != nil { + t.Error("消息订阅失败:" + err.Error()) + return + } + + // 至少等待一次刷盘或同步消费偏移,避免未刷就退出主线程 + time.Sleep(timeout * time.Second) + fmt.Println(fmt.Sprintf("%s %s 消费数据 %d 条", topic, channel, cCount)) +} + +// 测试生产者 +func TestProducer(t *testing.T) { + producer(TOPIC, 0, 5, t) +} + +// 测试生产者 +func TestProducerDelay(t *testing.T) { + fmt.Println("开始发送延迟消息") + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + producer("produce_delay_test1", consts.MsgDelay5s, 5, t) + wg.Done() + }() + go func() { + consumer("produce_delay_test1", "channel", 20, t) + wg.Done() + }() + wg.Wait() +} + +// 测试消费者 +func TestConsumer(t *testing.T) { + consumer(TOPIC, "channel", 120, t) +} + +// 测试多个消费者 +func TestMultiConsumer(t *testing.T) { + wg.Add(3) + go func() { + consumer(TOPIC, "channel", TIMEOUT, t) + wg.Done() + }() + go func() { + consumer(TOPIC, "channel1", TIMEOUT, t) + wg.Done() + }() + go func() { + consumer(TOPIC, "channel2", TIMEOUT, t) + wg.Done() + }() + wg.Wait() +} + +// 同时测试生产者和消费者 +func TestProducerAndConsumer(t *testing.T) { + wg.Add(4) + go func() { + producer(TOPIC, 0, 5, t) + wg.Done() + }() + go func() { + consumer(TOPIC, "channel", TIMEOUT, t) + wg.Done() + }() + go func() { + consumer(TOPIC, "channel2", TIMEOUT, t) + wg.Done() + }() + go func() { + consumer(TOPIC, "channel3", TIMEOUT, t) + wg.Done() + }() + wg.Wait() +} + +// 测试删除diskQueue 的所有本地文件 +func TestClearDiskQueueFiles(t *testing.T) { + diskqueue.Clear() +} diff --git a/internal/app/mqueue/router/demo.go b/internal/app/mqueue/router/demo.go new file mode 100644 index 0000000..d3c84ef --- /dev/null +++ b/internal/app/mqueue/router/demo.go @@ -0,0 +1,25 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/28 14:56 + */ + +package router + +import ( + "context" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/controller" +) + +var R = new(Router) + +type Router struct{} + +func (router *Router) BindController(ctx context.Context, group *ghttp.RouterGroup) { + group.Group("/mqueue/demo", func(group *ghttp.RouterGroup) { + group.POST("/produce", controller.Demo.Produce) + group.ALL("/subscribe", controller.Demo.Subscribe) + }) +} diff --git a/internal/app/mqueue/service/mqueue.go b/internal/app/mqueue/service/mqueue.go new file mode 100644 index 0000000..85c5989 --- /dev/null +++ b/internal/app/mqueue/service/mqueue.go @@ -0,0 +1,50 @@ +/** + * @Company: 云南奇讯科技有限公司 + * @Author: yxf + * @Description: + * @Date: 2023/7/3 8:56 + */ + +package service + +import ( + "github.com/tiger1103/gfast/v3/internal/app/mqueue/consts" + "github.com/tiger1103/gfast/v3/internal/app/mqueue/model" +) + +type IConsumer interface { + // CloseMqConsumer 关闭消费者 + CloseMqConsumer() +} + +// IProducer 消息队列生产者接口 +type IProducer interface { + // Publish 发布单条消息并等待结果 + Publish(topic string, body []byte) error + // PublishAsync 发布单条异步消息,不用等待服务器返回结果 + PublishAsync(topic string, body []byte) error + // DelayPublish 延时发布消息并等待结果,delay单位延迟等级1-18级对应: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + DelayPublish(topic string, body []byte, delay consts.MsgDelayLevel) error + // Close 关闭生产者 + Close() +} + +type IMQueue interface { + SendMsg(m *model.MQSendMsg) error + Subscribe(topic, channel string, handler model.MQConsumerHandlerCallback) error + Close() + Unsubscribe(topic, channel string) error +} + +var localMQueue IMQueue + +func MQueue() IMQueue { + if localMQueue == nil { + panic("implement not found for interface IMQueue, forgot register?") + } + return localMQueue +} + +func RegisterMQueue(i IMQueue) { + localMQueue = i +} diff --git a/manifest/config/config.yaml.bak b/manifest/config/config.yaml.bak index f24bdf7..82d4ee9 100644 --- a/manifest/config/config.yaml.bak +++ b/manifest/config/config.yaml.bak @@ -107,4 +107,26 @@ gen: autoRemovePre: true #是否自动删除表前缀 tablePrefix: "table_,qxkj_" #表前缀 templatePath: "./resource/template/vm" #代码生成模板路径 - frontDir: "../../../project/webProject/p2022/gfast-v3.2-ui/gfast3.2-ui" #前端路径 \ No newline at end of file + frontDir: "../../../project/webProject/p2022/gfast-v3.2-ui/gfast3.2-ui" #前端路径 + +# 消息队列 +mqueue: + enable: true + closeWaitTime: 5 # 关闭服务器时等待刷盘时间(秒),最好不要小于5 + driver: "diskQueue" # 驱动类型 nsq|diskQueue|rocketmq + channel: "channel" # 默认频道名称 + nsq: + address: "192.168.75.3" # nsq地址 + producer_port: 4150 # 生产者端口 + consumer_port: 4161 # 消费者http端口 + rocketmq: + nameServers: "192.168.75.3:9876" # 服务器地址 + retry: 2 # 重试次数 + logLevel: "warn" # 日志级别 debug|error|warn|fatal + diskQueue: + dataPath: "./resource/data/storage/diskqueue" # 存储路径 + maxBytesPerFile: 2097152 # 单个单件最大字节数(byte),默认:2m(2097152) + syncEvery: 2500 # 多少次读写后刷盘 + syncTimeout: 2 # 刷盘间隔(秒) + maxMsgSize: 128144 # 最大消息字节数(byte) + minMsgSize: 0 # 最小消息字节数 diff --git a/resource/template/vm/vue/edit-vue.template b/resource/template/vm/vue/edit-vue.template index f7808f7..86eb603 100644 --- a/resource/template/vm/vue/edit-vue.template +++ b/resource/template/vm/vue/edit-vue.template @@ -453,6 +453,11 @@ export default defineComponent({ }, {{end}} } + {{range $index, $column := .table.Columns}} + {{if eq $column.HtmlType "imagefile"}} + imageUrl{{$column.GoField}}.value = '' + {{end}} + {{end}} }; {{$setUpData:=true}} {{range $index, $column := .table.Columns}}