Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 28ee958

Browse files
committed
[refactor] load data from level db.
Signed-off-by: Wang Xu <gnawux@gmail.com>
1 parent 8861dbb commit 28ee958

File tree

6 files changed

+231
-97
lines changed

6 files changed

+231
-97
lines changed

daemon/daemon.go

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package daemon
22

33
import (
4-
"encoding/json"
54
"errors"
65
"fmt"
76
"os"
@@ -46,46 +45,30 @@ func (daemon *Daemon) Restore() error {
4645
return nil
4746
}
4847

49-
ch := daemon.db.GetAllPods()
48+
ch := pod.LoadAllPods(daemon.db)
5049
if ch == nil {
5150
estr := "Cannot list pods in leveldb"
5251
glog.Error(estr)
5352
return errors.New(estr)
5453
}
5554

5655
for {
57-
item, ok := <-ch
56+
layout, ok := <-ch
5857
if !ok {
5958
break
6059
}
61-
if item == nil {
60+
if layout == nil {
6261
estr := "error during load pods from leveldb"
6362
glog.Error(estr)
6463
return errors.New(estr)
6564
}
6665

67-
podId := string(item.K[4:])
68-
69-
glog.V(1).Infof("reloading pod %s with args %s", podId, string(item.V))
70-
71-
daemon.db.DeletePod(podId)
72-
73-
var podSpec apitypes.UserPod
74-
err := json.Unmarshal(item.V, &podSpec)
75-
if err != nil {
76-
return err
77-
}
78-
79-
vmId, err := daemon.db.GetP2V(podId)
80-
if err != nil {
81-
glog.V(1).Infof("no existing VM for pod %s: %v", podId, err)
82-
}
83-
66+
glog.V(1).Infof("reloading pod %s: %#v", layout.Id, layout)
8467
fc := pod.NewPodFactory(daemon.Factory, daemon.PodList, daemon.db, daemon.Storage, daemon.Daemon, daemon.DefaultLog)
8568

86-
p, err := pod.LoadXPod(fc, &podSpec, vmId)
69+
p, err := pod.LoadXPod(fc, layout)
8770
if err != nil {
88-
glog.Warningf("Got a unexpected error when creating(load) pod %s, %v", podId, err)
71+
glog.Warningf("Got a unexpected error when creating(load) pod %s, %v", layout.Id, err)
8972
continue
9073
}
9174

daemon/daemondb/daemondb.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,11 @@ func (d *DaemonDB) PrefixList2Chan(prefix []byte, keyFilter KeyFilter) chan *KVP
227227
for iter.Next() {
228228
glog.V(3).Infof("got key from leveldb %s", string(iter.Key()))
229229
if keyFilter == nil || keyFilter(iter.Key()) {
230-
ch <- &KVPair{append([]byte{}, iter.Key()...), append([]byte{}, iter.Value()...)}
230+
k := make([]byte, 0, len(iter.Key()))
231+
v := make([]byte, 0, len(iter.Value()))
232+
copy(k, iter.Key())
233+
copy(v, iter.Value())
234+
ch <- &KVPair{k, v}
231235
}
232236
}
233237
iter.Release()

daemon/pod/persist.go

Lines changed: 214 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import (
44
"fmt"
55

66
"github.com/golang/protobuf/proto"
7-
"github.com/hyperhq/hyperd/types"
87
"github.com/hyperhq/hypercontainer-utils/hlog"
8+
"github.com/hyperhq/hyperd/daemon/daemondb"
9+
"github.com/hyperhq/hyperd/types"
910
)
1011

1112
/// Layout of Persistent Info of a Pod:
@@ -25,19 +26,105 @@ import (
2526

2627
const (
2728
LAYOUT_KEY_PREFIX = "PL-"
28-
LAYOUT_KEY_FMT = "PL-%s"
29-
SB_KEY_FMT = "SB-%s"
30-
PS_KEY_FMT = "PS-%s"
31-
PM_KEY_FMT = "PM-%s"
32-
CX_KEY_FMT = "CX-%s"
33-
VX_KEY_FMT = "VX-%s-%s"
34-
IF_KEY_FMT = "IF-%s-%s"
29+
LAYOUT_KEY_FMT = "PL-%s"
30+
SB_KEY_FMT = "SB-%s"
31+
PS_KEY_FMT = "PS-%s"
32+
PM_KEY_FMT = "PM-%s"
33+
CX_KEY_FMT = "CX-%s"
34+
VX_KEY_FMT = "VX-%s-%s"
35+
IF_KEY_FMT = "IF-%s-%s"
3536
)
3637

38+
func LoadAllPods(db *daemondb.DaemonDB) chan *types.PersistPodLayout {
39+
kvchan := db.PrefixList2Chan([]byte(LAYOUT_KEY_PREFIX), nil)
40+
if kvchan == nil {
41+
return nil
42+
}
43+
ch := make(chan *types.PersistPodLayout, 128)
44+
go func() {
45+
for {
46+
kv, ok := <-kvchan
47+
if !ok {
48+
hlog.Log(INFO, "layout loading finished")
49+
close(ch)
50+
return
51+
}
52+
hlog.Log(TRACE, "loading layout of container %s", string(kv.K))
53+
54+
var layout types.PersistPodLayout
55+
err := proto.Unmarshal(kv.V, &layout)
56+
if err != nil {
57+
hlog.Log(ERROR, "failed to decode layout of contaienr %s: %v", string(kv.K), err)
58+
continue
59+
}
60+
ch <- &layout
61+
}
62+
}()
63+
return ch
64+
}
65+
66+
func LoadXPod(factory *PodFactory, layout *types.PersistPodLayout) (*XPod, error) {
67+
spec, err := loadGloabalSpec(factory.db, layout.Id)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
p, err := newXPod(factory, spec)
73+
if err != nil {
74+
hlog.Log(ERROR, "failed to create pod from spec: %v", err)
75+
//remove spec from daemonDB
76+
//remove vm from daemonDB
77+
return nil, err
78+
}
79+
err = p.reserveNames(spec.Containers)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
for _, ix := range layout.Interfaces {
85+
if err := p.loadInterface(ix); err != nil {
86+
return nil, err
87+
}
88+
}
89+
90+
for _, vid := range layout.Volumes {
91+
if err := p.loadVolume(vid); err != nil {
92+
return nil, err
93+
}
94+
}
95+
96+
for _, cid := range layout.Containers {
97+
if err := p.loadContainer(cid); err != nil {
98+
return nil, err
99+
}
100+
}
101+
102+
err = p.loadSandbox()
103+
if err != nil {
104+
//remove vm from daemonDB
105+
return nil, err
106+
}
107+
108+
err = p.loadPodMeta()
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
//resume logging
114+
if p.status == S_POD_RUNNING {
115+
for _, c := range p.containers {
116+
c.startLogging()
117+
}
118+
}
119+
120+
// don't need to reserve name again, because this is load
121+
return p, nil
122+
}
123+
37124
func (p *XPod) savePod() error {
38125
var (
39126
containers = make([]string, 0, len(p.containers))
40-
volumes = make([]string, 0, len(p.volumes))
127+
volumes = make([]string, 0, len(p.volumes))
41128
interfaces = make([]string, 0, len(p.interfaces))
42129
)
43130

@@ -63,76 +150,171 @@ func (p *XPod) savePod() error {
63150
}
64151
}
65152

66-
for inf, i := range p.interfaces {
153+
for inf, i := range p.interfaces {
67154
interfaces = append(interfaces, inf)
68155
if err := i.saveInterface(); err != nil {
69156
return err
70157
}
71158
}
72159

73160
pl := &types.PersistPodLayout{
74-
Id: p.Id(),
161+
Id: p.Id(),
75162
Containers: containers,
76-
Volumes: volumes,
163+
Volumes: volumes,
77164
Interfaces: interfaces,
78165
}
79-
return p.saveMessage(fmt.Sprintf(LAYOUT_KEY_FMT, p.Id()), pl, p, "pod layout")
166+
return saveMessage(p.factory.db, fmt.Sprintf(LAYOUT_KEY_FMT, p.Id()), pl, p, "pod layout")
80167
}
81168

82169
func (p *XPod) saveGlobalSpec() error {
83-
return p.saveMessage(fmt.Sprintf(PS_KEY_FMT, p.Id()), p.globalSpec, p, "global spec")
170+
return saveMessage(p.factory.db, fmt.Sprintf(PS_KEY_FMT, p.Id()), p.globalSpec, p, "global spec")
171+
}
172+
173+
func loadGloabalSpec(db *daemondb.DaemonDB, id string) (*types.UserPod, error) {
174+
var spec types.UserPod
175+
err := loadMessage(db, fmt.Sprintf(LAYOUT_KEY_FMT, id), &spec, nil, fmt.Sprintf("spec for %s", id))
176+
if err != nil {
177+
return nil, err
178+
}
179+
return &spec, nil
84180
}
85181

86182
func (p *XPod) savePodMeta() error {
87183
meta := &types.PersistPodMeta{
88-
Id: p.Id(),
184+
Id: p.Id(),
89185
Services: p.services,
90-
Labels: p.labels,
186+
Labels: p.labels,
187+
}
188+
if p.info != nil {
189+
meta.CreatedAt = p.info.CreatedAt
190+
}
191+
return saveMessage(p.factory.db, fmt.Sprintf(PM_KEY_FMT, p.Id()), meta, p, "pod meta")
192+
}
193+
194+
func (p *XPod) loadPodMeta() error {
195+
var meta types.PersistPodMeta
196+
err := loadMessage(p.factory.db, fmt.Sprintf(PM_KEY_FMT, p.Id()), &meta, p, "pod meta")
197+
if err != nil {
198+
return err
199+
}
200+
p.initPodInfo()
201+
if meta.CreatedAt > 0 {
202+
p.info.CreatedAt = meta.CreatedAt
91203
}
92-
return p.saveMessage(fmt.Sprintf(PM_KEY_FMT, p.Id()), meta, p, "pod meta")
204+
p.labels = meta.Labels
205+
p.services = meta.Services
206+
return nil
93207
}
94208

95209
func (c *Container) saveContainer() error {
96210
cx := &types.PersistContainer{
97-
Id: c.Id(),
98-
Pod: c.p.Id(),
99-
Spec: c.spec,
211+
Id: c.Id(),
212+
Pod: c.p.Id(),
213+
Spec: c.spec,
100214
Descript: c.descript,
101215
}
102-
return c.p.saveMessage(fmt.Sprintf(CX_KEY_FMT, c.Id()), cx, c, "container info")
216+
return saveMessage(c.p.factory.db, fmt.Sprintf(CX_KEY_FMT, c.Id()), cx, c, "container info")
217+
}
218+
219+
func (p *XPod) loadContainer(id string) error {
220+
var cx types.PersistContainer
221+
err := loadMessage(p.factory.db, fmt.Sprintf(CX_KEY_FMT, id), &cx, p, fmt.Sprintf("container info of %s", id))
222+
if err != nil {
223+
return err
224+
}
225+
c, err := newContainer(p, cx.Spec, false)
226+
if err != nil {
227+
p.Log(ERROR, "failed to reload container %s from spec: %v", id, err)
228+
return err
229+
}
230+
err = p.factory.registry.ReserveContainer(c.Id(), c.SpecName(), p.Id())
231+
if err != nil {
232+
p.Log(ERROR, "failed to register name of container %s (%s) during load", c.Id(), c.SpecName(), err)
233+
return err
234+
}
235+
p.containers[c.Id()] = c
236+
return nil
103237
}
104238

105239
func (v *Volume) saveVolume() error {
106240
vx := &types.PersistVolume{
107-
Name: v.spec.Name,
108-
Pod: v.p.Id(),
109-
Spec: v.spec,
241+
Name: v.spec.Name,
242+
Pod: v.p.Id(),
243+
Spec: v.spec,
110244
Descript: v.descript,
111245
}
112-
return v.p.saveMessage(fmt.Sprintf(VX_KEY_FMT, v.p.Id(), v.spec.Name), vx, v, "volume info")
246+
return saveMessage(v.p.factory.db, fmt.Sprintf(VX_KEY_FMT, v.p.Id(), v.spec.Name), vx, v, "volume info")
247+
}
248+
249+
func (p *XPod) loadVolume(id string) error {
250+
var vx types.PersistVolume
251+
err := loadMessage(p.factory.db, fmt.Sprintf(VX_KEY_FMT, p.Id(), id), &vx, p, fmt.Sprintf("volume info of %s", id))
252+
if err != nil {
253+
return err
254+
}
255+
v := newVolume(p, vx.Spec)
256+
v.descript = vx.Descript
257+
v.status = S_VOLUME_CREATED
258+
p.volumes[v.spec.Name] = v
259+
return nil
113260
}
114261

115262
func (inf *Interface) saveInterface() error {
116263
ix := &types.PersistInterface{
117-
Id: inf.descript.Id,
118-
Pod: inf.p.Id(),
119-
Spec: inf.spec,
264+
Id: inf.descript.Id,
265+
Pod: inf.p.Id(),
266+
Spec: inf.spec,
120267
Descript: inf.descript,
121268
}
122-
return inf.p.saveMessage(fmt.Sprintf(IF_KEY_FMT, inf.p.Id(), inf.descript.Id), ix, inf, "interface info")
269+
return saveMessage(inf.p.factory.db, fmt.Sprintf(IF_KEY_FMT, inf.p.Id(), inf.descript.Id), ix, inf, "interface info")
123270
}
124271

125-
func (p *XPod) saveMessage(key string, message proto.Message, owner hlog.LogOwner, op string) error {
272+
func (p *XPod) loadInterface(id string) error {
273+
var ix types.PersistInterface
274+
err := loadMessage(p.factory.db, fmt.Sprintf(IF_KEY_FMT, p.Id(), id), &ix, p, fmt.Sprintf("inf info of %s", id))
275+
if err != nil {
276+
return err
277+
}
278+
inf := newInterface(p, ix.Spec)
279+
inf.descript = ix.Descript
280+
p.interfaces[inf.descript.Id] = inf
281+
return nil
282+
}
283+
284+
func (p *XPod) loadSandbox() error {
285+
var sb types.SandboxPersistInfo
286+
err := loadMessage(p.factory.db, fmt.Sprintf(SB_KEY_FMT, p.Id()), &sb, p, "load sandbox info")
287+
if err != nil {
288+
return err
289+
}
290+
return p.reconnectSandbox(sb.Id, sb.PersistInfo)
291+
}
292+
293+
func saveMessage(db *daemondb.DaemonDB, key string, message proto.Message, owner hlog.LogOwner, op string) error {
126294
pm, err := proto.Marshal(message)
127295
if err != nil {
128296
hlog.HLog(ERROR, owner, 2, "failed to serialize %s: %v", op, err)
129297
return err
130298
}
131-
err = p.factory.db.Update([]byte(key),pm)
299+
err = db.Update([]byte(key), pm)
132300
if err != nil {
133301
hlog.HLog(ERROR, owner, 2, "failed to write %s to db: %v", op, err)
134302
return err
135303
}
136304
hlog.HLog(DEBUG, owner, 2, "%s serialized to db", op)
137305
return nil
138-
}
306+
}
307+
308+
func loadMessage(db *daemondb.DaemonDB, key string, message proto.Message, owner hlog.LogOwner, op string) error {
309+
v, err := db.Get([]byte(key))
310+
if err != nil {
311+
hlog.HLog(ERROR, owner, 2, "failed to load %s: %v", op, err)
312+
return err
313+
}
314+
err = proto.Unmarshal(v, message)
315+
if err != nil {
316+
hlog.HLog(ERROR, owner, 2, "failed to unpack loaded %s: %v", op, err)
317+
return err
318+
}
319+
return nil
320+
}

0 commit comments

Comments
 (0)