چون میخواستم به صورت یک api و فانکشن باشه از helper استفاده کردم .
from elasticsearch import helpers, Elasticsearch
es = Elasticsearch()
in my model: (Sqlalchemy)
def to_es_details(self):
details = self.data
data = {}
data.update({"_index": "library:book_details", "_type": "book_details", "_id": self.id,
"_source": {'title': self.title, 'publisher': self.publisher_uid, 'lang': self.language,
'category': self.category, 'download_count': self.download_count, 'price': self.price, 'discount': self.discount, 'created_at': self.created_epoch, 'status': self.status, 'view_count': self.view_count, 'rate': self.rate}})
return data
def bulk_pg_to_es():
media = session.query(Media).filter(Media.status != u'EXPIRED').all()
data = []
for m in media:
data.append(m.to_es_details())
helpers.bulk(es, data)