Memory leak while indexing data ...

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory leak while indexing data ...

Zaharije

Hi

we are doing single index update via python script (there is sample code at
the end of email) but after few millions of documents we are getting
slowdown and "Long running GC". It seems that we are having similar problem
as
http://elasticsearch-users.115913.n3.nabble.com/lack-of-memory-td762199.html#a762199,
but i did not found any solution in that mail.

we are having 8 node cluster (8GB of ram). Also we are using 24 shards and 2
replicas. Actual numbers is rough estimate for 1bilion of documents but test
failed far from that number (also question is, is that large number of
shards x replicas too large? and how ES handles per-shard index - does it
need to be whole in memory?).

We used following configuration:

cluster:
    name: ES-test

discovery:
    type: jgroups
    jgroups:
        config: tcp
        bind_port: 7800
        bind_addr: katta
        tcpping:
            initial_hosts:
katta[7800],k00[7800],k01[7800],k02[7800],k03[7800],k04[7800],k06[7800],k07[7800]

gateway.fs.location: /search-sharing
gateway.type: fs

NFS is used for gateway.

Here is some dump from DEBUG log:

[11:18:39,215][INFO ][cluster.metadata         ] [Gaia] Index [users0]:
Update mapping [id120071] (dynamic)
[11:18:52,367][WARN ][monitor.jvm              ] [Gaia] Long GC collection
occurred, took [13s], breached threshold [10s][11:19:04,513][WARN
][monitor.jvm              ] [Gaia] Long GC collection occurred, took [12s],
breached threshold [10s]
[11:19:11,453][WARN ][jgroups.FC               ] Received two credit
requests from k07-63227 without any intervening messages; sen
ding 1981561 credits[11:19:23,380][WARN ][monitor.jvm              ] [Gaia]
Long GC collection occurred, took [18.7s], breached threshold
[10s][11:19:23,380][DEBUG][action.index             ] [Gaia] [users0][4],
Node[katta-26574], [P], S[STARTED]: Failed to execute [[users
0][id660070][4729da78-2392-4c41-9534-957be5ba1984], source[{"air_class":
"coach", "count": "true", "anual_income": 4314, "hotel":
"starwood", "zipcode": 94365, "sex": "female", "net_worth":
64362}]]java.lang.NullPointerException
        at
org.elasticsearch.index.mapper.xcontent.XContentNumberFieldMapper$CachedNumericTokenStream.close(XContentNumberFieldMap
per.java:216)
        at
org.apache.lucene.index.DocInverterPerField.processFields(DocInverterPerField.java:196)
        at
org.apache.lucene.index.DocFieldProcessorPerThread.processDocument(DocFieldProcessorPerThread.java:246)
        at
org.apache.lucene.index.DocumentsWriter.updateDocument(DocumentsWriter.java:774)
        at
org.apache.lucene.index.DocumentsWriter.addDocument(DocumentsWriter.java:752)
        at
org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1932)
        at
org.elasticsearch.index.engine.robin.RobinEngine.create(RobinEngine.java:191)
        at
org.elasticsearch.index.shard.service.InternalIndexShard.innerCreate(InternalIndexShard.java:222)
        at
org.elasticsearch.index.shard.service.InternalIndexShard.create(InternalIndexShard.java:210)
        at
org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:127)
       at
org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:56)
        at
org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.performOnPrimary(TransportShardReplicationOperationAction.java:328)
        at
org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.access$400(TransportShardReplicationOperationAction.java:198)
        at
org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1.run(T
ransportShardReplicationOperationAction.java:252)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
       at java.lang.Thread.run(Thread.java:619)
[11:19:28,071][WARN ][jgroups.FD               ] I was suspected by
k04-49735; ignoring the SUSPECT message and sending back a HEA
RTBEAT_ACK[11:19:42,368][WARN ][monitor.jvm              ] [Gaia] Long GC
collection occurred, took [18.7s], breached threshold [10s]


And after this there are more and more of the same message.


Here is sample of app which is doing indexing.

################################################################################
def createDocuments(a_index, a_type, a_cfg):
    dGen = dataGenerator.dataGenerator()
    dGen.init('data/fields.txt')
    nodes    = getNodeList(a_cfg)
    node     = dGen.getRandomListMember(nodes)
    address  = node['address']
    port     = node['port']
    url      = address + ':' + str(port)
    numOfDoc = int(a_cfg.getNode('create/indexes/numOfDocuments').text)
    numOfRet = int(a_cfg.getNode('create/indexes/numOfRetries').text)

    idx = ElasticSearch(url)

    success = 0
    errors  = 0
    startTime = time.time()
    for i in range (0, numOfDoc):
        data = getRandomData(dGen)
        numOfRetries = 0
        while 1:
            try:
                ret = idx.index(data, a_index, a_type)
                if ret.has_key('ok') and ret['ok'] == True:
                    success += 1
                else:
                    #print 'Error: ' + str(ret)
                    errors += 1
                break
            except:
                #print 'An error has occuerd, retrying....'
                if numOfRetries == numOfRet:
                    #print 'Unable to recover after ' + str(numOfRet) + '
retries.'
                    break
                numOfRetries += 1
                pass

    endTime = time.time()
    totalTime = endTime - startTime
    print 'Generated: ' + str(success) + ' records, errors: ' + str(errors)
+ ', time: ' + time.strftime('%M:%S', time.localtime(totalTime))

Thanks
Zaharije
--
View this message in context: http://elasticsearch-users.115913.n3.nabble.com/Memory-leak-while-indexing-data-tp930459p930459.html
Sent from the ElasticSearch Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Memory leak while indexing data ...

kimchy
Administrator
For some reason this got into the google groups spam filter, but I see you managed to post it, so answer in the other thread...

On Tue, Jun 29, 2010 at 3:41 PM, Zaharije <[hidden email]> wrote:

Hi

we are doing single index update via python script (there is sample code at
the end of email) but after few millions of documents we are getting
slowdown and "Long running GC". It seems that we are having similar problem
as
http://elasticsearch-users.115913.n3.nabble.com/lack-of-memory-td762199.html#a762199,
but i did not found any solution in that mail.

we are having 8 node cluster (8GB of ram). Also we are using 24 shards and 2
replicas. Actual numbers is rough estimate for 1bilion of documents but test
failed far from that number (also question is, is that large number of
shards x replicas too large? and how ES handles per-shard index - does it
need to be whole in memory?).

We used following configuration:

cluster:
   name: ES-test

discovery:
   type: jgroups
   jgroups:
       config: tcp
       bind_port: 7800
       bind_addr: katta
       tcpping:
           initial_hosts:
katta[7800],k00[7800],k01[7800],k02[7800],k03[7800],k04[7800],k06[7800],k07[7800]

gateway.fs.location: /search-sharing
gateway.type: fs

NFS is used for gateway.

Here is some dump from DEBUG log:

[11:18:39,215][INFO ][cluster.metadata         ] [Gaia] Index [users0]:
Update mapping [id120071] (dynamic)
[11:18:52,367][WARN ][monitor.jvm              ] [Gaia] Long GC collection
occurred, took [13s], breached threshold [10s][11:19:04,513][WARN
][monitor.jvm              ] [Gaia] Long GC collection occurred, took [12s],
breached threshold [10s]
[11:19:11,453][WARN ][jgroups.FC               ] Received two credit
requests from k07-63227 without any intervening messages; sen
ding 1981561 credits[11:19:23,380][WARN ][monitor.jvm              ] [Gaia]
Long GC collection occurred, took [18.7s], breached threshold
[10s][11:19:23,380][DEBUG][action.index             ] [Gaia] [users0][4],
Node[katta-26574], [P], S[STARTED]: Failed to execute [[users
0][id660070][4729da78-2392-4c41-9534-957be5ba1984], source[{"air_class":
"coach", "count": "true", "anual_income": 4314, "hotel":
"starwood", "zipcode": 94365, "sex": "female", "net_worth":
64362}]]java.lang.NullPointerException
        at
org.elasticsearch.index.mapper.xcontent.XContentNumberFieldMapper$CachedNumericTokenStream.close(XContentNumberFieldMap
per.java:216)
        at
org.apache.lucene.index.DocInverterPerField.processFields(DocInverterPerField.java:196)
        at
org.apache.lucene.index.DocFieldProcessorPerThread.processDocument(DocFieldProcessorPerThread.java:246)
        at
org.apache.lucene.index.DocumentsWriter.updateDocument(DocumentsWriter.java:774)
        at
org.apache.lucene.index.DocumentsWriter.addDocument(DocumentsWriter.java:752)
        at
org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1932)
        at
org.elasticsearch.index.engine.robin.RobinEngine.create(RobinEngine.java:191)
        at
org.elasticsearch.index.shard.service.InternalIndexShard.innerCreate(InternalIndexShard.java:222)
        at
org.elasticsearch.index.shard.service.InternalIndexShard.create(InternalIndexShard.java:210)
        at
org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:127)
       at
org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:56)
        at
org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.performOnPrimary(TransportShardReplicationOperationAction.java:328)
        at
org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.access$400(TransportShardReplicationOperationAction.java:198)
        at
org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1.run(T
ransportShardReplicationOperationAction.java:252)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
       at java.lang.Thread.run(Thread.java:619)
[11:19:28,071][WARN ][jgroups.FD               ] I was suspected by
k04-49735; ignoring the SUSPECT message and sending back a HEA
RTBEAT_ACK[11:19:42,368][WARN ][monitor.jvm              ] [Gaia] Long GC
collection occurred, took [18.7s], breached threshold [10s]


And after this there are more and more of the same message.


Here is sample of app which is doing indexing.

################################################################################
def createDocuments(a_index, a_type, a_cfg):
    dGen = dataGenerator.dataGenerator()
    dGen.init('data/fields.txt')
    nodes    = getNodeList(a_cfg)
    node     = dGen.getRandomListMember(nodes)
    address  = node['address']
    port     = node['port']
    url      = address + ':' + str(port)
    numOfDoc = int(a_cfg.getNode('create/indexes/numOfDocuments').text)
    numOfRet = int(a_cfg.getNode('create/indexes/numOfRetries').text)

    idx = ElasticSearch(url)

    success = 0
    errors  = 0
    startTime = time.time()
    for i in range (0, numOfDoc):
        data = getRandomData(dGen)
        numOfRetries = 0
        while 1:
            try:
                ret = idx.index(data, a_index, a_type)
                if ret.has_key('ok') and ret['ok'] == True:
                    success += 1
                else:
                    #print 'Error: ' + str(ret)
                    errors += 1
                break
            except:
                #print 'An error has occuerd, retrying....'
                if numOfRetries == numOfRet:
                    #print 'Unable to recover after ' + str(numOfRet) + '
retries.'
                    break
                numOfRetries += 1
                pass

    endTime = time.time()
    totalTime = endTime - startTime
    print 'Generated: ' + str(success) + ' records, errors: ' + str(errors)
+ ', time: ' + time.strftime('%M:%S', time.localtime(totalTime))

Thanks
Zaharije
--
View this message in context: http://elasticsearch-users.115913.n3.nabble.com/Memory-leak-while-indexing-data-tp930459p930459.html
Sent from the ElasticSearch Users mailing list archive at Nabble.com.