forked from jaysoffian/dotlock
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdotlock.py
More file actions
366 lines (315 loc) · 11.8 KB
/
dotlock.py
File metadata and controls
366 lines (315 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python
# -*- mode: Python; tab-width: 4; indent-tabs-mode: nil; -*-
# ex: set tabstop=4 expandtab:
# Please do not change the two lines above. See PEP 8, PEP 263.
#
# $Id: dotlock.py 3511 2008-02-08 03:37:32Z jay $
'''
DotLock - A class to allow files to be locked over NFS reliably.
NFS historically did not provide locking. NFS itself is a stateless protocol on
the server side, and locking is inheritently stateful. RPC lockd/statd were
added in NFSv3 to provide locking via lockf/flocl/fnctl. However, they are
generally considered to be unreliable, and may not be available in any case.
So-called dot locking is a mechanism which uses an auxilary lock file as a
locking semaphore. Implemented properly it is reliable over NFS. Its primary
weakness is that there is no race-free way to resolve a stale-lock situation.
However, because stale locks are uncommon and there is a race-resistent way to
clear stale locks, this is not too bad.
Example usage:
from dotlock import DotLock
lock = DotLock(path)
lock.acquire()
# do stuff with path
lock.release()
Example usage:
def check_func(lock):
return lock_is_valid(lock.path)
lock = DotLock(path, check_func=check_func)
if lock.acquire(max_attempts=5):
try:
# do stuff
finally:
lock.release()
'''
__all__ = ["DotLock"]
import os
import time
import string
import socket
from stat import ST_NLINK, ST_INO, ST_MTIME
try:
import thread
except ImportError:
thread = None
try:
True, False
except NameError:
True, False = 1, 0
class DotLockError(Exception): pass
class DotLock:
# See the paragraph on O_EXCL in the Linux open(2) man page for
# background on the algorithm in use here. (In theory, O_EXCL works with
# NFSv3, but this will work w/NFSv2.)
valid_lock_age = 60 # N.B. should be at least 2 * hijack_delay
poll_interval = 15
hijack_delay = 15
def __init__(self, path, check_func=None):
'''Instantiate a DotLock
The <path> argument is the file to be "locked" (the lock file itself is
named "<path>.lock"). The optional argument "check_func" is called
during acquire() verify whether a lock that appears stale is still
valid. It is passed the DotLock instance as an argument. It should
return True if the lock is still valid, otherwise False.
'''
self.path = path
self.lockpath = path + ".lock"
self.check_func = check_func
self._skew = 0 # clock skew
self._lock = None # (st_ino, lockdata)
def acquire(self, max_attempts=None):
'''Acquire lock. Return True if successful, otherwise False.
acquire() will poll forever until it can acquire the lock. For a highly
contested lock it is possible for acquire() to starve indefinitely. In
such situations, make use of the optional <max_attemmpts> argument and
check the return value of acquire().
'''
# If we wanted to prevent starvation of highly contested locks,
# there's a couple options: 1) implement a queue, possibly as a
# directory where waiting processes deposit a file containing their
# name. the process which holds the lock could check this directory
# and then pass the lock off to the next in-line (e.g, the waiting
# process writes its locktemp into the directory. the process with
# the lock releases by copying the contents of that file into the
# extant lock and then deleting the locktemp from the directory. a
# polling process would then need to check whether it now has the
# lock. 2) The DotLock instance could keep track of how frequently it
# is acquiring the lock. If the acquisitation rate exceeds a
# threshold, back-off to give other processes a chance to acquire the
# lock.
if self.is_locked():
debug("* i already have the lock!")
return
i = 0
while True:
if self._trylock():
debug("> lock acquired")
return True
if self.is_stale():
if self._hijacklock():
debug("> lock acquired (by hijacking)")
return True
if max_attempts:
i = i + 1
if i >= max_attempts:
return False
time.sleep(self.poll_interval)
def release(self):
'''Release the lock.
It safe to call this even if the lock has not been acquired, in which
case it is a no-op.
'''
if self._lock is None:
debug("* i don't have the lock to release!")
return
if not self.is_locked():
return
unlink(self.lockpath)
self._lock = None
debug("< lock released")
def refresh(self):
'''Refresh the lock.
If the lock will be held for longer than DotLock.valid_lock_age,
refresh() should be called periodically to keep the lock from becoming
stale. DotLockError will be raised if the lock has been hijacked due to
not calling refresh() in time.
'''
# Update times on lock file so that other clients know it is valid.
if not self.is_locked():
raise DotLockError("Not locked")
time_t = time.time() - self._skew
utime(self.lockpath, (time_t, time_t))
def is_locked(self):
'''Return True if the lock is (still) acquired, False otherwise.'''
if self._lock is None:
return False
# check if it was hijacked
st_ino, lockdata = self._lock
st = stat(self.lockpath)
if st and st[ST_INO] == st_ino and readfile(self.lockpath) == lockdata:
return True
debug("* lock was hijacked")
self._lock = None
return False
def is_stale(self):
'''Return True if the lock appears stale, False otherwise.
is_stale() will first check the lock's age. If it is less than
DotLock.valid_lock_age, the lock is not stale. Otherwise call
check_lock() to see if the lock is valid but for some reason the lock
holder is not refreshing it.
'''
st = stat(self.lockpath)
if not st:
return False # we'll try again...
age = time.time() - st[ST_MTIME] - self._skew
debug("* lock age %d (skew %d)" % (age, self._skew))
if age < max(2 * self.hijack_delay, self.valid_lock_age):
return False
if self.check_lock():
return False
return True
def check_lock(self):
'''Return whether check_func indicates lock is valid (not stale).
check_lock() is called when a lock is stale (exceeds
DotLock.valid_lock_age). Normally it should not be called as a proper
lock holder should call refresh() periodically.
'''
return self.check_func and self.check_func(self)
def _trylock(self):
'''Try dot-locking protocol. Update clock skew.'''
hostname = socket.gethostname()
pid = os.getpid()
tid = thread and thread.get_ident() or 0
lockdata = "%s %s %s %s" % (hostname, pid, tid, time.time())
locktemp = "%stmp-%s-%s-%s" % (self.lockpath, hostname, pid, tid)
writefile(locktemp, lockdata)
try:
link(locktemp, self.lockpath)
st = stat(locktemp)
if not st:
return
if st[ST_NLINK] == 2:
self._lock = (st[ST_INO], lockdata)
skew = time.time() - st[ST_MTIME]
if abs(skew) > 1:
self._skew = skew
finally:
unlink(locktemp)
return self._lock is not None
def _hijacklock(self):
'''Try hijacking a lock.'''
# hijacking a lock is perilous. nothing prevents multiple processes
# from doing it at once. to ameliorate the situation, we sleep awhile
# after stealing the lock then check to see if any other clients have
# stolen it. to hijack the lock, we just overwrite the existing lock.
# this is safer than unlinking which has a race condition where two
# processes can simultaneously unlink() what they think is a stale
# lock, while a third process acquires the lock file in-between the
# two unlinks()
debug("* attempting to hijack the lock")
hostname = socket.gethostname()
pid = os.getpid()
tid = thread and thread.get_ident() or 0
lockdata = "%s %s %s %s" % (hostname, pid, tid, time.time())
writefile(self.lockpath, lockdata)
time.sleep(self.hijack_delay)
if readfile(self.lockpath) != lockdata:
return False
st = stat(self.lockpath)
if not st:
return False
self._lock = (st[ST_INO], lockdata)
return True
def __del__(self):
if self._lock is not None:
self.release()
def writefile(path, data):
f = open(path, 'w')
try:
if data:
f.write(data)
finally:
f.close()
def readfile(path):
try:
f = open(path)
try:
return f.read()
finally:
f.close()
except (OSError, IOError):
pass
def utime(path, t):
try:
os.utime(path, t)
except (OSError, IOError):
pass
def link(src, dst):
try:
os.link(src, dst)
except (OSError, IOError):
pass
def unlink(path):
try:
os.unlink(path)
except (OSError, IOError):
pass
def stat(path):
try:
return os.stat(path)
except (OSError, IOError):
return None
def debug(s):
pass
def debug_test(s):
import sys
sys.stderr.write("%s [%s]: %s\n" % (
time.asctime(time.gmtime(time.time())), os.getpid(), s))
def test(path, run_time=300, num_procs=1):
import random
global debug
debug = debug_test
run_time = int(run_time)
num_procs = int(num_procs)
if num_procs > 1:
# fork a bunch of children, then release them all at once
read_end, write_end = os.pipe()
for i in xrange(min(num_procs, 50)):
start_delay = random.random()
pid = os.fork()
if not pid:
break
if pid:
time.sleep(2)
debug("release the hounds")
os.close(read_end)
os.close(write_end)
return
else:
debug("waiting")
os.close(write_end)
os.fdopen(read_end).read() # block till parent is ready
time.sleep(start_delay) # give them all a different start time
end_time = time.time() + run_time
lock = DotLock(path)
# speed up testing a bit by turning down some of times
lock.hijack_delay = 8
lock.valid_lock_age = 16
id_ = "%s-%s" % (socket.gethostname(), os.getpid())
while time.time() < end_time:
if not lock.acquire(1):
time.sleep(1)
continue
if random.random() < .05:
# simulate a hung process which doesn't release the lock
debug("* hanging the lock")
time.sleep(lock.valid_lock_age * 1.5)
continue
if os.path.exists(path):
f = open(path)
n = int(string.split(f.readlines()[-1])[0]) + 1
f.close()
else:
n = 1
# provide time for any race condition to show itself
time.sleep(1)
open(path, 'a').write("%s %s\n" % (n, id_))
debug("* wrote %s" % n)
lock.release()
# give someone else time to acquire the lock
time.sleep(random.randint(1,5))
del lock
debug("* exiting")
if __name__ == '__main__':
# dot_lock.py <path> <run_time> <num_procs>
import sys
apply(test, sys.argv[1:])