OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 import argparse |
| 4 import os |
| 5 |
| 6 import boto |
| 7 from boto.compat import json |
| 8 from boto.dynamodb.schema import Schema |
| 9 |
| 10 |
| 11 DESCRIPTION = """Load data into one or more DynamoDB tables. |
| 12 |
| 13 For each table, data is read from two files: |
| 14 - {table_name}.metadata for the table's name, schema and provisioned |
| 15 throughput (only required if creating the table). |
| 16 - {table_name}.data for the table's actual contents. |
| 17 |
| 18 Both files are searched for in the current directory. To read them from |
| 19 somewhere else, use the --in-dir parameter. |
| 20 |
| 21 This program does not wipe the tables prior to loading data. However, any |
| 22 items present in the data files will overwrite the table's contents. |
| 23 """ |
| 24 |
| 25 |
| 26 def _json_iterload(fd): |
| 27 """Lazily load newline-separated JSON objects from a file-like object.""" |
| 28 buffer = "" |
| 29 eof = False |
| 30 while not eof: |
| 31 try: |
| 32 # Add a line to the buffer |
| 33 buffer += fd.next() |
| 34 except StopIteration: |
| 35 # We can't let that exception bubble up, otherwise the last |
| 36 # object in the file will never be decoded. |
| 37 eof = True |
| 38 try: |
| 39 # Try to decode a JSON object. |
| 40 json_object = json.loads(buffer.strip()) |
| 41 |
| 42 # Success: clear the buffer (everything was decoded). |
| 43 buffer = "" |
| 44 except ValueError: |
| 45 if eof and buffer.strip(): |
| 46 # No more lines to load and the buffer contains something other |
| 47 # than whitespace: the file is, in fact, malformed. |
| 48 raise |
| 49 # We couldn't decode a complete JSON object: load more lines. |
| 50 continue |
| 51 |
| 52 yield json_object |
| 53 |
| 54 |
| 55 def create_table(metadata_fd): |
| 56 """Create a table from a metadata file-like object.""" |
| 57 |
| 58 |
| 59 def load_table(table, in_fd): |
| 60 """Load items into a table from a file-like object.""" |
| 61 for i in _json_iterload(in_fd): |
| 62 # Convert lists back to sets. |
| 63 data = {} |
| 64 for k, v in i.iteritems(): |
| 65 if isinstance(v, list): |
| 66 data[k] = set(v) |
| 67 else: |
| 68 data[k] = v |
| 69 table.new_item(attrs=data).put() |
| 70 |
| 71 |
| 72 def dynamodb_load(tables, in_dir, create_tables): |
| 73 conn = boto.connect_dynamodb() |
| 74 for t in tables: |
| 75 metadata_file = os.path.join(in_dir, "%s.metadata" % t) |
| 76 data_file = os.path.join(in_dir, "%s.data" % t) |
| 77 if create_tables: |
| 78 with open(metadata_file) as meta_fd: |
| 79 metadata = json.load(meta_fd) |
| 80 table = conn.create_table( |
| 81 name=t, |
| 82 schema=Schema(metadata["schema"]), |
| 83 read_units=metadata["read_units"], |
| 84 write_units=metadata["write_units"], |
| 85 ) |
| 86 table.refresh(wait_for_active=True) |
| 87 else: |
| 88 table = conn.get_table(t) |
| 89 |
| 90 with open(data_file) as in_fd: |
| 91 load_table(table, in_fd) |
| 92 |
| 93 |
| 94 if __name__ == "__main__": |
| 95 parser = argparse.ArgumentParser( |
| 96 prog="dynamodb_load", |
| 97 description=DESCRIPTION |
| 98 ) |
| 99 parser.add_argument( |
| 100 "--create-tables", |
| 101 action="store_true", |
| 102 help="Create the tables if they don't exist already (without this flag,
attempts to load data into non-existing tables fail)." |
| 103 ) |
| 104 parser.add_argument("--in-dir", default=".") |
| 105 parser.add_argument("tables", metavar="TABLES", nargs="+") |
| 106 |
| 107 namespace = parser.parse_args() |
| 108 |
| 109 dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables) |
OLD | NEW |