Skip to content

Commit ca38b01

Browse files
王哈哈王哈哈
authored andcommitted
init
0 parents  commit ca38b01

6 files changed

Lines changed: 258 additions & 0 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea

README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Async Task
2+
3+
![](asyncTask.jpg)
4+
5+
安装
6+
```go
7+
go get github.com/wanghaha-dev/asyncTask
8+
```
9+
10+
使用
11+
```go
12+
package main
13+
14+
import (
15+
"context"
16+
"fmt"
17+
"github.com/wanghaha-dev/asyncTask/asyncTask"
18+
"sync"
19+
)
20+
21+
func main() {
22+
ctx := context.Background()
23+
task, err := asyncTask.NewTask(ctx, asyncTask.Config{
24+
Addr: "127.0.0.1:6379",
25+
DB: 0,
26+
Password: "",
27+
})
28+
if err != nil {
29+
panic(err)
30+
}
31+
32+
var wg sync.WaitGroup
33+
wg.Add(1)
34+
35+
// take task
36+
go func() {
37+
asyncTask.Each(1000, func() {
38+
data, err := task.TakeNormalTask()
39+
if err != nil {
40+
panic(err)
41+
}
42+
fmt.Println("data:", data)
43+
})
44+
wg.Done()
45+
}()
46+
47+
// put task
48+
asyncTask.Each(1000, func() {
49+
err = task.PutNormalTask("ddd", asyncTask.Map{
50+
"name": "wanghaha",
51+
})
52+
if err != nil {
53+
panic(err)
54+
}
55+
})
56+
57+
wg.Wait()
58+
fmt.Println("finish.")
59+
}
60+
```

asyncTask.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package asyncTask
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/go-redis/redis/v8"
8+
"time"
9+
)
10+
11+
type Config struct {
12+
Addr string
13+
DB int
14+
Password string
15+
}
16+
17+
func NewTask(ctx context.Context, config Config) (*Task, error) {
18+
rdb := redis.NewClient(&redis.Options{
19+
Addr: config.Addr,
20+
Password: config.Password, // no password set
21+
DB: config.DB, // use default DB
22+
})
23+
24+
task := &Task{
25+
RedisClient: rdb,
26+
ctx: ctx,
27+
}
28+
29+
return task, nil
30+
}
31+
32+
type Map map[string]interface{}
33+
34+
type Task struct {
35+
RedisClient *redis.Client
36+
ctx context.Context
37+
}
38+
39+
func (t *Task) putTask(taskId string, list string, data Map) error {
40+
d := struct {
41+
TaskId string `json:"task_id"`
42+
Data Map `json:"data"`
43+
}{
44+
taskId,
45+
data,
46+
}
47+
48+
taskContent, err := json.Marshal(d)
49+
if err != nil {
50+
return err
51+
}
52+
53+
err = t.RedisClient.LPush(t.ctx, list, taskContent).Err()
54+
if err != nil {
55+
return err
56+
}
57+
return nil
58+
}
59+
60+
func (t *Task) PutNormalTask(taskId string, data Map) error {
61+
return t.putTask(taskId, "normal", data)
62+
}
63+
func (t *Task) PutSuccessTask(taskId string, data Map) error {
64+
return t.putTask(taskId, "success", data)
65+
}
66+
func (t *Task) PutFailTask(taskId string, data Map) error {
67+
return t.putTask(taskId, "fail", data)
68+
}
69+
70+
func (t *Task) takeNormalTask(list string) (Map, error) {
71+
sliceCmd := t.RedisClient.BRPop(t.ctx, 0, list)
72+
73+
err := sliceCmd.Err()
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
result, err := sliceCmd.Result()
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
data := make(map[string]interface{})
84+
err = json.Unmarshal([]byte(result[1]), &data)
85+
if err != nil {
86+
return nil, err
87+
}
88+
return data, nil
89+
}
90+
91+
func (t *Task) TakeNormalTask() (Map, error) {
92+
data, err := t.takeNormalTask("normal")
93+
if err != nil {
94+
return nil, err
95+
}
96+
return data, nil
97+
}
98+
99+
func (t *Task) TakeSuccessTask() (Map, error) {
100+
data, err := t.takeNormalTask("success")
101+
if err != nil {
102+
return nil, err
103+
}
104+
return data, nil
105+
}
106+
107+
func (t *Task) TakeFailTask() (Map, error) {
108+
data, err := t.takeNormalTask("fail")
109+
if err != nil {
110+
return nil, err
111+
}
112+
return data, nil
113+
}
114+
115+
func (t *Task) getTaskLength(list string) (int64, error) {
116+
lLen := t.RedisClient.LLen(t.ctx, list)
117+
err := lLen.Err()
118+
119+
if err != nil {
120+
return -1, err
121+
}
122+
123+
return lLen.Result()
124+
}
125+
126+
func (t *Task) GetFailTaskLength() (int64, error) {
127+
return t.getTaskLength("fail")
128+
}
129+
130+
func (t *Task) GetSuccessTaskLength() (int64, error) {
131+
return t.getTaskLength("success")
132+
}
133+
134+
func (t *Task) GetNormalTaskLength() (int64, error) {
135+
return t.getTaskLength("normal")
136+
}
137+
138+
func (t *Task) getFailTaskList(list string) ([]string, error) {
139+
sliceCmd := t.RedisClient.LRange(t.ctx, list, 0, -1)
140+
141+
err := sliceCmd.Err()
142+
if err != nil {
143+
return []string{}, err
144+
}
145+
return sliceCmd.Result()
146+
}
147+
148+
func (t *Task) GetFailTaskList() ([]string, error) {
149+
return t.getFailTaskList("fail")
150+
}
151+
152+
func (t *Task) GetSuccessTaskList() ([]string, error) {
153+
return t.getFailTaskList("success")
154+
}
155+
156+
func (t *Task) GetNormalTaskList() ([]string, error) {
157+
return t.getFailTaskList("normal")
158+
}
159+
160+
func EachStrings(data []string) {
161+
for _, item := range data {
162+
fmt.Println(item)
163+
}
164+
}
165+
166+
func Each(count int, f func()) {
167+
for i := 0; i < count; i++ {
168+
f()
169+
}
170+
}
171+
172+
func Datetime() string {
173+
return time.Now().Format("2006-01-02 15:04:05")
174+
}

asyncTask.jpg

88.5 KB
Loading

go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module github.com/wanghaha-dev/asyncTask
2+
3+
go 1.17
4+
5+
require (
6+
github.com/cespare/xxhash/v2 v2.1.2 // indirect
7+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
8+
github.com/go-redis/redis/v8 v8.11.5 // indirect
9+
)

go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
2+
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
5+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
6+
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
7+
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
8+
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
9+
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
10+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
11+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
12+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
13+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
14+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 commit comments

Comments
 (0)