Skip to content

Commit 7bd431e

Browse files
committed
update
1 parent a0db159 commit 7bd431e

4 files changed

Lines changed: 274 additions & 27 deletions

File tree

README.md

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,76 @@
77
go get github.com/wanghaha-dev/asyncTask
88
```
99

10-
使用
10+
简单使用
1111
```go
1212
package main
1313

1414
import (
1515
"context"
1616
"fmt"
17+
"log"
18+
1719
"github.com/wanghaha-dev/asyncTask"
20+
)
21+
22+
func main() {
23+
task, err := asyncTask.NewTask(context.Background(), asyncTask.Config{
24+
Addr: "127.0.0.1:6379",
25+
DB: 0,
26+
Password: "",
27+
})
28+
if err != nil {
29+
panic(err)
30+
}
31+
32+
// put task
33+
taskId, err := asyncTask.GenerateTaskId()
34+
if err != nil {
35+
log.Fatalln("generate taskId error!")
36+
}
37+
task.PutNormalTask(taskId, asyncTask.Map{
38+
"name": "task1",
39+
"date": asyncTask.Datetime(),
40+
"user": "1001",
41+
})
42+
43+
// take task
44+
t1, err := task.TakeNormalTask()
45+
if err != nil {
46+
log.Fatalln("take normal task error:", err)
47+
}
48+
49+
fmt.Println("take task is: ", t1)
50+
fmt.Println(t1["task_id"], t1["data"])
51+
fmt.Println(t1["data"].(map[string]interface{})["name"])
52+
jsonContext, err := t1.GetJSON()
53+
if err != nil {
54+
log.Fatalln("GetJson error: ", err)
55+
}
56+
fmt.Println(t1.GetJSON())
57+
// {"data":{"date":"2022-08-26 10:26:44","name":"task1","user":"1001"},"task_id":"03efb9e08b284642b5fd1ea11ae20341"}
58+
fmt.Println(asyncTask.JSONGet(jsonContext, "data.name"))
59+
}
60+
```
61+
62+
63+
64+
并发使用
65+
```go
66+
package main
67+
68+
import (
69+
"context"
70+
"fmt"
71+
"log"
1872
"sync"
73+
74+
"github.com/wanghaha-dev/asyncTask"
1975
)
2076

2177
func main() {
22-
ctx := context.Background()
23-
task, err := asyncTask.NewTask(ctx, asyncTask.Config{
24-
Addr: "127.0.0.1:6379",
78+
task, err := asyncTask.NewTask(context.Background(), asyncTask.Config{
79+
Addr: "127.0.0.1:6379",
2580
DB: 0,
2681
Password: "",
2782
})
@@ -30,29 +85,38 @@ func main() {
3085
}
3186

3287
var wg sync.WaitGroup
33-
wg.Add(1)
88+
wg.Add(200)
3489

35-
// take task
90+
// put task
3691
go func() {
37-
asyncTask.Each(1000, func() {
38-
data, err := task.TakeNormalTask()
92+
asyncTask.Each(100, func() {
93+
taskId, err := asyncTask.GenerateTaskId()
3994
if err != nil {
40-
panic(err)
95+
log.Fatalln("generate taskId error!")
4196
}
42-
fmt.Println("data:", data)
97+
task.PutNormalTask(taskId, asyncTask.Map{
98+
"name": "task1",
99+
"date": asyncTask.Datetime(),
100+
"user": "1001",
101+
})
102+
103+
wg.Done()
43104
})
44-
wg.Done()
45105
}()
46106

47-
// put task
48-
asyncTask.Each(1000, func() {
49-
err = task.PutNormalTask("ddd", asyncTask.Map{
50-
"name": "wanghaha",
107+
// take task
108+
go func() {
109+
asyncTask.Each(100, func() {
110+
t1, err := task.TakeNormalTask()
111+
if err != nil {
112+
log.Fatalln("take normal task error:", err)
113+
}
114+
115+
fmt.Println("take task is: ", t1)
116+
117+
wg.Done()
51118
})
52-
if err != nil {
53-
panic(err)
54-
}
55-
})
119+
}()
56120

57121
wg.Wait()
58122
fmt.Println("finish.")

asyncTask.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"github.com/go-redis/redis/v8"
7+
"reflect"
8+
"strings"
89
"time"
10+
11+
"github.com/go-redis/redis/v8"
12+
"github.com/nacos-group/nacos-sdk-go/inner/uuid"
13+
"github.com/tidwall/gjson"
914
)
1015

1116
type Config struct {
@@ -67,7 +72,7 @@ func (t *Task) PutFailTask(taskId string, data Map) error {
6772
return t.putTask(taskId, "fail", data)
6873
}
6974

70-
func (t *Task) takeNormalTask(list string) (Map, error) {
75+
func (t *Task) takeNormalTask(list string) (map[string]interface{}, error) {
7176
sliceCmd := t.RedisClient.BRPop(t.ctx, 0, list)
7277

7378
err := sliceCmd.Err()
@@ -79,8 +84,9 @@ func (t *Task) takeNormalTask(list string) (Map, error) {
7984
if err != nil {
8085
return nil, err
8186
}
87+
fmt.Println("result:", result, reflect.TypeOf(result[1]))
8288

83-
data := make(map[string]interface{})
89+
data := make(Map)
8490
err = json.Unmarshal([]byte(result[1]), &data)
8591
if err != nil {
8692
return nil, err
@@ -157,6 +163,18 @@ func (t *Task) GetNormalTaskList() ([]string, error) {
157163
return t.getFailTaskList("normal")
158164
}
159165

166+
func (m Map) GetJSON() (string, error) {
167+
data, err := json.Marshal(m)
168+
if err != nil {
169+
return "", err
170+
}
171+
return string(data), nil
172+
}
173+
174+
func JSONGet(jsonContext string, key string) gjson.Result {
175+
return gjson.Get(jsonContext, key)
176+
}
177+
160178
func EachStrings(data []string) {
161179
for _, item := range data {
162180
fmt.Println(item)
@@ -172,3 +190,11 @@ func Each(count int, f func()) {
172190
func Datetime() string {
173191
return time.Now().Format("2006-01-02 15:04:05")
174192
}
193+
194+
func GenerateTaskId() (string, error) {
195+
u, err := uuid.NewV4()
196+
if err != nil {
197+
return "", err
198+
}
199+
return strings.ReplaceAll(u.String(), "-", ""), nil
200+
}

go.mod

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,15 @@ module github.com/wanghaha-dev/asyncTask
22

33
go 1.17
44

5+
require (
6+
github.com/go-redis/redis/v8 v8.11.5
7+
github.com/nacos-group/nacos-sdk-go v1.1.2
8+
)
9+
510
require (
611
github.com/cespare/xxhash/v2 v2.1.2 // indirect
712
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
8-
github.com/go-redis/redis/v8 v8.11.5 // indirect
13+
github.com/tidwall/gjson v1.14.3 // indirect
14+
github.com/tidwall/match v1.1.1 // indirect
15+
github.com/tidwall/pretty v1.2.0 // indirect
916
)

0 commit comments

Comments
 (0)