jack2 codebase
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
8.7KB

  1. #! /usr/bin/env python
  2. # encoding: utf-8
  3. # Thomas Nagy, 2011-2015 (ita)
  4. """
  5. A client for the network cache (playground/netcache/). Launch the server with:
  6. ./netcache_server, then use it for the builds by adding the following:
  7. def build(bld):
  8. bld.load('netcache_client')
  9. The parameters should be present in the environment in the form:
  10. NETCACHE=host:port waf configure build
  11. Or in a more detailed way:
  12. NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
  13. where:
  14. host: host where the server resides, by default localhost
  15. port: by default push on 11001 and pull on 12001
  16. Use the server provided in playground/netcache/Netcache.java
  17. """
  18. import os, socket, time, atexit, sys
  19. from waflib import Task, Logs, Utils, Build, Runner
  20. from waflib.Configure import conf
  21. BUF = 8192 * 16
  22. HEADER_SIZE = 128
  23. MODES = ['PUSH', 'PULL', 'PUSH_PULL']
  24. STALE_TIME = 30 # seconds
  25. GET = 'GET'
  26. PUT = 'PUT'
  27. LST = 'LST'
  28. BYE = 'BYE'
  29. all_sigs_in_cache = (0.0, [])
  30. def put_data(conn, data):
  31. if sys.hexversion > 0x3000000:
  32. data = data.encode('iso8859-1')
  33. cnt = 0
  34. while cnt < len(data):
  35. sent = conn.send(data[cnt:])
  36. if sent == 0:
  37. raise RuntimeError('connection ended')
  38. cnt += sent
  39. push_connections = Runner.Queue(0)
  40. pull_connections = Runner.Queue(0)
  41. def get_connection(push=False):
  42. # return a new connection... do not forget to release it!
  43. try:
  44. if push:
  45. ret = push_connections.get(block=False)
  46. else:
  47. ret = pull_connections.get(block=False)
  48. except Exception:
  49. ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  50. if push:
  51. ret.connect(Task.push_addr)
  52. else:
  53. ret.connect(Task.pull_addr)
  54. return ret
  55. def release_connection(conn, msg='', push=False):
  56. if conn:
  57. if push:
  58. push_connections.put(conn)
  59. else:
  60. pull_connections.put(conn)
  61. def close_connection(conn, msg=''):
  62. if conn:
  63. data = '%s,%s' % (BYE, msg)
  64. try:
  65. put_data(conn, data.ljust(HEADER_SIZE))
  66. except:
  67. pass
  68. try:
  69. conn.close()
  70. except:
  71. pass
  72. def close_all():
  73. for q in (push_connections, pull_connections):
  74. while q.qsize():
  75. conn = q.get()
  76. try:
  77. close_connection(conn)
  78. except:
  79. # ignore errors when cleaning up
  80. pass
  81. atexit.register(close_all)
  82. def read_header(conn):
  83. cnt = 0
  84. buf = []
  85. while cnt < HEADER_SIZE:
  86. data = conn.recv(HEADER_SIZE - cnt)
  87. if not data:
  88. #import traceback
  89. #traceback.print_stack()
  90. raise ValueError('connection ended when reading a header %r' % buf)
  91. buf.append(data)
  92. cnt += len(data)
  93. if sys.hexversion > 0x3000000:
  94. ret = ''.encode('iso8859-1').join(buf)
  95. ret = ret.decode('iso8859-1')
  96. else:
  97. ret = ''.join(buf)
  98. return ret
  99. def check_cache(conn, ssig):
  100. """
  101. List the files on the server, this is an optimization because it assumes that
  102. concurrent builds are rare
  103. """
  104. global all_sigs_in_cache
  105. if not STALE_TIME:
  106. return
  107. if time.time() - all_sigs_in_cache[0] > STALE_TIME:
  108. params = (LST,'')
  109. put_data(conn, ','.join(params).ljust(HEADER_SIZE))
  110. # read what is coming back
  111. ret = read_header(conn)
  112. size = int(ret.split(',')[0])
  113. buf = []
  114. cnt = 0
  115. while cnt < size:
  116. data = conn.recv(min(BUF, size-cnt))
  117. if not data:
  118. raise ValueError('connection ended %r %r' % (cnt, size))
  119. buf.append(data)
  120. cnt += len(data)
  121. if sys.hexversion > 0x3000000:
  122. ret = ''.encode('iso8859-1').join(buf)
  123. ret = ret.decode('iso8859-1')
  124. else:
  125. ret = ''.join(buf)
  126. all_sigs_in_cache = (time.time(), ret.splitlines())
  127. Logs.debug('netcache: server cache has %r entries' % len(all_sigs_in_cache[1]))
  128. if not ssig in all_sigs_in_cache[1]:
  129. raise ValueError('no file %s in cache' % ssig)
  130. class MissingFile(Exception):
  131. pass
  132. def recv_file(conn, ssig, count, p):
  133. check_cache(conn, ssig)
  134. params = (GET, ssig, str(count))
  135. put_data(conn, ','.join(params).ljust(HEADER_SIZE))
  136. data = read_header(conn)
  137. size = int(data.split(',')[0])
  138. if size == -1:
  139. raise MissingFile('no file %s - %s in cache' % (ssig, count))
  140. # get the file, writing immediately
  141. # TODO a tmp file would be better
  142. f = open(p, 'wb')
  143. cnt = 0
  144. while cnt < size:
  145. data = conn.recv(min(BUF, size-cnt))
  146. if not data:
  147. raise ValueError('connection ended %r %r' % (cnt, size))
  148. f.write(data)
  149. cnt += len(data)
  150. f.close()
  151. def sock_send(conn, ssig, cnt, p):
  152. #print "pushing %r %r %r" % (ssig, cnt, p)
  153. size = os.stat(p).st_size
  154. params = (PUT, ssig, str(cnt), str(size))
  155. put_data(conn, ','.join(params).ljust(HEADER_SIZE))
  156. f = open(p, 'rb')
  157. cnt = 0
  158. while cnt < size:
  159. r = f.read(min(BUF, size-cnt))
  160. while r:
  161. k = conn.send(r)
  162. if not k:
  163. raise ValueError('connection ended')
  164. cnt += k
  165. r = r[k:]
  166. def can_retrieve_cache(self):
  167. if not Task.pull_addr:
  168. return False
  169. if not self.outputs:
  170. return False
  171. self.cached = False
  172. cnt = 0
  173. sig = self.signature()
  174. ssig = Utils.to_hex(self.uid() + sig)
  175. conn = None
  176. err = False
  177. try:
  178. try:
  179. conn = get_connection()
  180. for node in self.outputs:
  181. p = node.abspath()
  182. recv_file(conn, ssig, cnt, p)
  183. cnt += 1
  184. except MissingFile as e:
  185. Logs.debug('netcache: file is not in the cache %r' % e)
  186. err = True
  187. except Exception as e:
  188. Logs.debug('netcache: could not get the files %r' % e)
  189. err = True
  190. # broken connection? remove this one
  191. close_connection(conn)
  192. conn = None
  193. finally:
  194. release_connection(conn)
  195. if err:
  196. return False
  197. for node in self.outputs:
  198. node.sig = sig
  199. #if self.generator.bld.progress_bar < 1:
  200. # self.generator.bld.to_log('restoring from cache %r\n' % node.abspath())
  201. self.cached = True
  202. return True
  203. @Utils.run_once
  204. def put_files_cache(self):
  205. if not Task.push_addr:
  206. return
  207. if not self.outputs:
  208. return
  209. if getattr(self, 'cached', None):
  210. return
  211. #print "called put_files_cache", id(self)
  212. bld = self.generator.bld
  213. sig = self.signature()
  214. ssig = Utils.to_hex(self.uid() + sig)
  215. conn = None
  216. cnt = 0
  217. try:
  218. for node in self.outputs:
  219. # We could re-create the signature of the task with the signature of the outputs
  220. # in practice, this means hashing the output files
  221. # this is unnecessary
  222. try:
  223. if not conn:
  224. conn = get_connection(push=True)
  225. sock_send(conn, ssig, cnt, node.abspath())
  226. except Exception as e:
  227. Logs.debug("netcache: could not push the files %r" % e)
  228. # broken connection? remove this one
  229. close_connection(conn)
  230. conn = None
  231. cnt += 1
  232. finally:
  233. release_connection(conn, push=True)
  234. bld.task_sigs[self.uid()] = self.cache_sig
  235. def hash_env_vars(self, env, vars_lst):
  236. # reimplement so that the resulting hash does not depend on local paths
  237. if not env.table:
  238. env = env.parent
  239. if not env:
  240. return Utils.SIG_NIL
  241. idx = str(id(env)) + str(vars_lst)
  242. try:
  243. cache = self.cache_env
  244. except AttributeError:
  245. cache = self.cache_env = {}
  246. else:
  247. try:
  248. return self.cache_env[idx]
  249. except KeyError:
  250. pass
  251. v = str([env[a] for a in vars_lst])
  252. v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
  253. m = Utils.md5()
  254. m.update(v.encode())
  255. ret = m.digest()
  256. Logs.debug('envhash: %r %r', ret, v)
  257. cache[idx] = ret
  258. return ret
  259. def uid(self):
  260. # reimplement so that the signature does not depend on local paths
  261. try:
  262. return self.uid_
  263. except AttributeError:
  264. m = Utils.md5()
  265. src = self.generator.bld.srcnode
  266. up = m.update
  267. up(self.__class__.__name__.encode())
  268. for x in self.inputs + self.outputs:
  269. up(x.path_from(src).encode())
  270. self.uid_ = m.digest()
  271. return self.uid_
  272. def make_cached(cls):
  273. if getattr(cls, 'nocache', None):
  274. return
  275. m1 = cls.run
  276. def run(self):
  277. if self.can_retrieve_cache():
  278. return 0
  279. return m1(self)
  280. cls.run = run
  281. m2 = cls.post_run
  282. def post_run(self):
  283. bld = self.generator.bld
  284. ret = m2(self)
  285. if bld.cache_global:
  286. self.put_files_cache()
  287. return ret
  288. cls.post_run = post_run
  289. @conf
  290. def setup_netcache(ctx, push_addr, pull_addr):
  291. Task.Task.can_retrieve_cache = can_retrieve_cache
  292. Task.Task.put_files_cache = put_files_cache
  293. Task.Task.uid = uid
  294. Task.push_addr = push_addr
  295. Task.pull_addr = pull_addr
  296. Build.BuildContext.hash_env_vars = hash_env_vars
  297. ctx.cache_global = True
  298. for x in Task.classes.values():
  299. make_cached(x)
  300. def build(bld):
  301. if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
  302. Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
  303. os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
  304. os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
  305. if 'NETCACHE' in os.environ:
  306. if not 'NETCACHE_PUSH' in os.environ:
  307. os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
  308. if not 'NETCACHE_PULL' in os.environ:
  309. os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
  310. v = os.environ['NETCACHE_PULL']
  311. if v:
  312. h, p = v.split(':')
  313. pull_addr = (h, int(p))
  314. else:
  315. pull_addr = None
  316. v = os.environ['NETCACHE_PUSH']
  317. if v:
  318. h, p = v.split(':')
  319. push_addr = (h, int(p))
  320. else:
  321. push_addr = None
  322. setup_netcache(bld, push_addr, pull_addr)