#!/usr/bin/python # This file is part of the Luau programming language and is licensed under MIT License; see LICENSE.txt for details # Given two heap snapshots (A & B), this tool performs reachability analysis on new objects allocated in B # This is useful to find memory leaks - reachability analysis answers the question "why is this set of objects not freed" # This tool can also be ran with just one snapshot, in which case it displays all allocated objects # The result of analysis is a .svg file which can be viewed in a browser # To generate these dumps, use luaC_dump, ideally preceded by luaC_fullgc import argparse import json import sys import svg argumentParser = argparse.ArgumentParser(description='Luau heap snapshot analyzer') argumentParser.add_argument('--split', dest = 'split', type = str, default = 'none', help = 'Perform additional root split using memory categories', choices = ['none', 'custom', 'all']) argumentParser.add_argument('snapshot') argumentParser.add_argument('snapshotnew', nargs='?') arguments = argumentParser.parse_args() class Node(svg.Node): def __init__(self): svg.Node.__init__(self) self.size = 0 self.count = 0 # data for memory category filtering self.objects = [] self.categories = set() def text(self): return self.name def title(self): return self.name def details(self, root): return "{} ({:,} bytes, {:.1%}); self: {:,} bytes in {:,} objects".format(self.name, self.width, self.width / root.width, self.size, self.count) # load files if arguments.snapshotnew == None: dumpold = None with open(arguments.snapshot) as f: dump = json.load(f) else: with open(arguments.snapshot) as f: dumpold = json.load(f) with open(arguments.snapshotnew) as f: dump = json.load(f) # reachability analysis: how much of the heap is reachable from roots? visited = set() queue = [] offset = 0 root = Node() for name, addr in dump["roots"].items(): queue.append((addr, root.child(name))) while offset < len(queue): addr, node = queue[offset] offset += 1 if addr in visited: continue visited.add(addr) obj = dump["objects"][addr] if not dumpold or not addr in dumpold["objects"]: node.count += 1 node.size += obj["size"] node.objects.append(obj) if obj["type"] == "table": pairs = obj.get("pairs", []) for i in range(0, len(pairs), 2): key = pairs[i+0] val = pairs[i+1] if key and val and dump["objects"][key]["type"] == "string": queue.append((key, node)) queue.append((val, node.child(dump["objects"][key]["data"]))) else: if key: queue.append((key, node)) if val: queue.append((val, node)) for a in obj.get("array", []): queue.append((a, node)) if "metatable" in obj: queue.append((obj["metatable"], node.child("__meta"))) elif obj["type"] == "function": queue.append((obj["env"], node.child("__env"))) source = "" if "proto" in obj: proto = dump["objects"][obj["proto"]] if "source" in proto: source = proto["source"] if "proto" in obj: queue.append((obj["proto"], node.child("__proto"))) for a in obj.get("upvalues", []): queue.append((a, node.child(source))) elif obj["type"] == "userdata": if "metatable" in obj: queue.append((obj["metatable"], node.child("__meta"))) elif obj["type"] == "thread": queue.append((obj["env"], node.child("__env"))) for a in obj.get("stack", []): queue.append((a, node.child("__stack"))) elif obj["type"] == "proto": for a in obj.get("constants", []): queue.append((a, node)) for a in obj.get("protos", []): queue.append((a, node)) elif obj["type"] == "upvalue": if "object" in obj: queue.append((obj["object"], node)) def annotateContainedCategories(node, start): for obj in node.objects: if obj["cat"] < start: obj["cat"] = 0 node.categories.add(obj["cat"]) for child in node.children.values(): annotateContainedCategories(child, start) for cat in child.categories: node.categories.add(cat) def filteredTreeForCategory(node, category): children = {} for c in node.children.values(): if category in c.categories: filtered = filteredTreeForCategory(c, category) if filtered: children[filtered.name] = filtered if len(children): result = Node() result.name = node.name # re-count the objects with the correct category that we have for obj in node.objects: if obj["cat"] == category: result.count += 1 result.size += obj["size"] result.children = children return result else: result = Node() result.name = node.name # re-count the objects with the correct category that we have for obj in node.objects: if obj["cat"] == category: result.count += 1 result.size += obj["size"] if result.count != 0: return result return None def splitIntoCategories(root): result = Node() for i in range(0, 256): filtered = filteredTreeForCategory(root, i) if filtered: name = dump["stats"]["categories"][str(i)]["name"] filtered.name = name result.children[name] = filtered return result if dump["stats"].get("categories") and arguments.split != 'none': if arguments.split == 'custom': annotateContainedCategories(root, 128) else: annotateContainedCategories(root, 0) root = splitIntoCategories(root) svg.layout(root, lambda n: n.size) svg.display(root, "Memory Graph", "cold")