Azure Blob StorageをPythonから操作する、前記事の続きです。さらに工夫したものは別の記事に分けていますので最終形態はそちらで。
引き続き検証環境はPython3.6+Windows 10 Pro、実装はPython3前提です。
さて、基本操作は前の記事で実装しましたが、フラットな構造に対応するために「ディレクトリ直下のファイルのみ同期」、というのはちょっと雑でした。
今回は、元のローカルパスを合わせてBLOBのメタデータとして記録することで、何とかツリー構造に対応させたいと思います。
もくじ:
メタデータの利用
今回はBLOB名をランダム文字列で作成し、メタデータを使ってローカルのパスを記録させ、サブディレクトリ含めツリー構造を保持することにします。
サブディレクトリを含むとなると、多数のファイルが存在するような場合が厄介なことになりますが、まあ今回は目をつぶります。ファイル数が気になる場合は書庫に固めてしまうなどの代替案が考えられますが、今回は触れません。
ついでにメタデータに他のファイル情報も書いておくことにします。うまくやれば、一定以下のサイズのファイルのみ転送したり、最近に更新があったもののみ転送する、などの使い方ができます。
今回のスクリプトです。前記事の_blob_client
クラスを変更しています。
ちょこちょこ変更していますが基本は同じ。変更点を以下に記載していきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | import os import random import string import urllib from datetime import datetime from azure.storage.blob import BlockBlobService class _blob_client: def __init__(self, conf): # check if the home directory path is valid self.path = conf["home_path"] if not os.path.isdir(conf["home_path"]): raise Exception("home directory is not found") # build client self.client = BlockBlobService( account_name=conf["account_name"], account_key=conf["account_key"]) # set proxy if enabled if conf["proxy_enabled"]: self.client.set_proxy(conf["proxy_addr"], conf["proxy_port"], user=conf["proxy_user"], password=conf["proxy_pass"]) # set container name with checking if conf["container"] in [x.name for x in self.client.list_containers()]: self.__container = conf["container"] else: raise Exception("invalid container name") def __metadata(self, path_): # returns file attributes as (str, str) dictionary mtime_ = datetime.fromtimestamp(os.path.getmtime(path_)) return {"path": os.path.relpath(path_, self.path), "updated": mtime_.strftime('%Y-%m-%d, %H:%M:%S'), "size": "{}".format(os.path.getsize(path_))} def download(self, blob_name): # make local directory meta_ = self.client.get_blob_metadata( container_name=self.__container, blob_name=blob_name) file_path = os.path.join( self.path, urllib.parse.unquote(meta_["path"])) os.makedirs(os.path.dirname(file_path), exist_ok=True) # download blob print("receiving blob: {}".format(blob_name)) self.client.get_blob_to_path(container_name=self.__container, blob_name=blob_name, file_path=file_path) # modify timestamp mtime_ = datetime.strptime(meta_["updated"], '%Y-%m-%d, %H:%M:%S') mtime_ = mtime_.timestamp() os.utime(file_path, (mtime_, mtime_)) def upload(self, local_path): # check file path if not os.path.isfile(local_path): raise Exception("invalid file path: {}".format(local_path)) # generate blob name name_ = ''.join(random.choices( string.ascii_letters + string.digits, k=16)) # create blob from local file print("creating blob: {}...".format(name_)) meta_ = self.__metadata(local_path) meta_["path"] = urllib.parse.quote(meta_["path"]) self.client.create_blob_from_path(container_name=self.__container, blob_name=name_, file_path=local_path, metadata=meta_) def fetch_remote(self): # get block blob info from storage container blob_info = [] print("getting blob list...") for blob in self.client.list_blobs(container_name=self.__container): meta_ = self.client.get_blob_metadata(container_name=self.__container, blob_name=blob.name) if meta_ is None: print("skip blob {} due to missing metadata".format(blob.name)) continue blob_info.append({"name": blob.name, "path": urllib.parse.unquote(meta_["path"]), "updated": meta_["updated"], "size": meta_["size"]}) return blob_info def fetch_local(self): # scan local directory and get file info file_info = [] for root, dirs, files in os.walk(self.path): for file_name in files: file_path = os.path.join(root, file_name) file_info.append(self.__metadata(file_path)) return file_info def clear(self, blob_name=None): # remove block blob on storage container blobs = [x.name for x in self.client.list_blobs(self.__container)] if blob_name is not None: blobs = [x for x in blobs if x == blob_name] for blob in blobs: print("removing blob: {}".format(blob)) self.client.delete_blob(container_name=self.__container, blob_name=blob) |
メタデータの設定
メタデータはBLOBの作成時(ローカルファイルのアップロード時)に設定できる他、既存のBLOBに対して個別に設定できます。
今回のスクリプトでは_blob_client.upload()
メソッドを改変し、内部のcreate_blob_from_path()
メソッドにメタデータとして辞書を渡して設定します。ただし、内容(key-valueのペア)は文字列である必要があります。
1 2 3 4 | self.client.create_blob_from_path(container_name=self.__container, blob_name=name_, file_path=local_path, metadata=meta_) |
今回はローカルにあるファイルのパス(対象ディレクトリからの相対パス)、最終更新日時、サイズ(バイト数)をメタデータとして設定します。
ローカルのファイルとBLOBの属性を比較できるように、ローカルファイルの検索時(_blob_client.fetch_local
メソッド)にも同等の情報を取得できるようにしました。
一方で、BLOB名にはランダムな文字列で生成したIDをセットしています。生成方法は何でもいいのですが、ファイル名のみでは重複する可能性があるのでこうしました。
1 2 3 | # generate blob name name_ = ''.join(random.choices( string.ascii_letters + string.digits, k=16)) |
メタデータの取得
get_blob_metadata()
メソッドを呼んで、特定のBLOBのメタデータ(辞書型)を取得します。
ここではダウンロードの際に(ダウンロード先の)ローカルファイルを作成すべきパスを生成するために使います。
1 2 | meta_ = self.client.get_blob_metadata(container_name=self.__container, blob_name=blob.name) |
ちなみに、ダウンロード時には途中のディレクトリが存在しない場合が考えられるので、明示的に作成しています。
1 | os.makedirs(os.path.dirname(file_path), exist_ok=True) |
日本語対応
マルチバイト文字といった方が正確かもしれませんが、メタデータに使える文字列には制約があり、日本語が使えません。今回はローカルのファイルのパスをメタデータに設定しているので、日本語を含む名前のファイルやディレクトリがあると困ったことになります。仕方ないのでURLエンコードを使います。
ちなみにBLOB名には日本語が使えます。おそらく内部的にエンコード(パーセントエスケープ)しているのでしょう。
ここではアップロード時にエンコードし、ダウンロード時にデコードしています。いずれもurllib.parse
モジュールを使用しています。
アップロードのとき:
1 2 | meta_ = self.__metadata(local_path) meta_["path"] = urllib.parse.quote(meta_["path"]) |
ダウンロード、またはBLOB一覧表示のとき:
1 2 | file_path = os.path.join( self.path, urllib.parse.unquote(meta_["path"])) |
ファイル時刻の変更
何も考えずAzure Storageを経由すると、ファイルのメタ情報(タイムスタンプやパーミッション)が保持されません。
今回は最終更新日時のみ保持する仕組みを作っておくことにしました。
具体的には、メタデータに最終更新日時を書いてアップロードし、またダウンロード後にファイルのタイムスタンプを設定しなおす、という操作を行います。
BLOB自身にもタイムスタンプを表すプロパティがあるのですが、今回はメタデータにまとめるということで。
1 2 3 4 5 6 | def __metadata(self, path_): # returns file attributes as (str, str) dictionary mtime_ = datetime.fromtimestamp(os.path.getmtime(path_)) return {"path": os.path.relpath(path_, self.path), "updated": mtime_.strftime('%Y-%m-%d, %H:%M:%S'), "size": "{}".format(os.path.getsize(path_))} |
os.utime
メソッドに同じ最終アクセス日時と最終更新日時をセットして、ファイルのタイムスタンプを書き換えます。
1 2 3 | mtime_ = datetime.strptime(meta_["updated"], '%Y-%m-%d, %H:%M:%S') mtime_ = mtime_.timestamp() os.utime(file_path, (mtime_, mtime_)) |
また、各メソッドにファイルパスやBLOB名を個別に指定できるよう引数を変更しています。使い方は次回まとめて記載します。