Saturday, 27 March 2021

Python Elasticsearch


Basic ElasticSearch connection to from Python and search, the main issue is to convert the retrieved data to your suitable  needs.  For begging limit the data we receive by using size option in (size=10), and limit the retrieving fields, only the needed one ("_source": ["field_x", ..., "field_y"],).
the biggest issue is the the nested dict we receive from  ElasticSearch, to convert to DataFrame we musrt to use the json_normalize.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Elasticsearch stuff to import

import ssl, certifi
from elasticsearch import Elasticsearch
from elasticsearch.connection import create_ssl_context
from elasticsearch import Elasticsearch, RequestsHttpConnection

# Panda stuff to import
from pandas import json_normalize

def main_search1():
    # # no certificate verification
    ssl_context = create_ssl_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    es = Elasticsearch(hosts=[{'host': '127.0.0.1', 'port': 9200}],
                       scheme="https",
                       # to ensure that it does not use the default value `True`
		               connection_class=RequestsHttpConnection,
                       # enable SSL
                       use_ssl=True,
                        verify_certs=False,
                       http_auth=("user", "password"))
    print (es.info())
    # search query on elasticsearch
    result = es.search(
    index="syslog-2021.03.12",
    body={
        # field to retriev from elasticsearch
        "_source": ["cisco", "timestamp"],
        # search query
        "query": {
		"match": {
  			'user.name':'test'
		}
        }
    },
    # number of results to retriev
    size=10)

    # show retriewed result "['hits']['hits']"  show only found data
    print(result['hits']['hits'])

    # print results from Elasticsearch
    all_hits = result['hits']['hits']
    for num, doc in enumerate(all_hits):
        print ("DOC ID:", doc["_id"], "--->", doc, type(doc), "\n")


    #convert to Panda DataFrame with normalize dictiniory --> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.json_normalize.html
    res_content_pd = json_normalize(result['hits']['hits'])
    print (res_content_pd)
    return


if __name__ == '__main__':
    main_search1()

Elasticsearch-dsl

The main problem of using Elasticsearch API is the query (body) syntax, it's not human friendly especial for first time or mass usage in code, it's hard to write debug and execute correctly.
The main idea of Elasticsearch-dsl is to simplify the query and filters of API.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Elasticsearch stuff to import

import ssl, certifi
from elasticsearch import Elasticsearch
from elasticsearch.connection import create_ssl_context
from elasticsearch import Elasticsearch, RequestsHttpConnection

# Panda stuff to import
from pandas import json_normalize
from elasticsearch_dsl import Search, Q
def main_search2():
    # # no certificate verification
    ssl_context = create_ssl_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    es = Elasticsearch(hosts=[{'host': '127.0.0.1', 'port': 9200}],
                       scheme="https",
                       # to ensure that it does not use the default value `True`
                       connection_class=RequestsHttpConnection,
                       # enable SSL
                       use_ssl=True,
                       verify_certs=False,
                       http_auth=("user", "password"))
    print (es.info())

    # search query on elasticsearch-dsl, more simple way ti make logical queries
    # if searchig for nested data (example user:{name:'test1'} we must use double ** if not nested no ** needed
    query = Q('match', **{'user.name':'test'}) & Q('match', **{'observer.ip':'1.1.1.1'})

    #difine index ant result numbet to retriev with size option
    s = Search(using=es, index='syslog-2021.03.12').query(query).extra(size=4000)

    # define fields to retriev fields(['timestamp', 'cisco'])
    s = s.source(['timestamp', 'cisco'])

    #count the number of resuls
    total = s.count()
    print(total)
    #difine tthe numbet of results to retriev
    s = s[0:10]
    # Execute function search and return an instance of Response wrapping all the data.
    # if retrieving big data set use scan() option which returns a generator that will iterate over all the documents matching the query.
    res_content = s.execute()


    # show retriewed result "['hits']['hits']"  show only found data
    print(res_content['hits']['hits'])

    # print results from Elasticsearch
    all_hits = res_content['hits']['hits']
    for num, doc in enumerate(all_hits):
        print ("DOC ID:", doc["_id"], "--->", doc, type(doc), "\n")

    # convert to Panda DataFrame with normalize dictiniory --> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.json_normalize.html
    results= [d.to_dict() for d in res_content]
    res_content_pd1 = json_normalize(results)
    print(res_content_pd1)

    # not os effiecient way of using ['hits']['hits'], not make dataframe (more time is needed )
    res_filtered = [x['_source'].to_dict() for x in res_content['hits']['hits']]
    res_content_pd2 = json_normalize(res_filtered)
    print (res_content_pd2)


if __name__ == '__main__':
    main_search2()


The main difference of the Elasticsearch and Elasticsearch-dsl is the query fields:

query = Q('match', **{'user.name':'test'}) & Q('match', **{'observer.ip':'1.1.1.1'})

We can define the match query and logical values or and, etc..
To match field within another field we must use ** for defining nested dict sub values by dot.
Section "Dotted fields"